From 0550659f5ffa9c8b92529d980c75144b35e8864a Mon Sep 17 00:00:00 2001 From: Crag Wolfe Date: Wed, 21 Jun 2017 20:04:01 -0400 Subject: [PATCH] Store resource attributes in the DB Store resource attributes that may be cached in the DB, saving the cost of re-resolving them later. This works for most resources, specifically those that do not override the get_attribute() method. Change-Id: I71f8aa431a60457326167b8c82adc03ca750eda6 Partial-Bug: #1660831 --- heat/db/sqlalchemy/api.py | 91 +++++++++++++++++-- heat/engine/attributes.py | 28 ++++++ heat/engine/check_resource.py | 8 +- heat/engine/resource.py | 31 ++++++- .../aws/autoscaling/autoscaling_group.py | 7 ++ .../resources/openstack/neutron/lbaas/pool.py | 1 + heat/engine/service.py | 15 ++- heat/objects/resource.py | 29 +++++- heat/objects/resource_properties_data.py | 9 +- heat/tests/test_event.py | 3 +- heat/tests/test_resource.py | 48 +++++++++- heat/tests/test_resource_properties_data.py | 4 +- 12 files changed, 252 insertions(+), 22 deletions(-) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 085095b56e..273ca4fbb0 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -29,6 +29,7 @@ import six import sqlalchemy from sqlalchemy import and_ from sqlalchemy import func +from sqlalchemy import or_ from sqlalchemy import orm from sqlalchemy.orm import aliased as orm_aliased @@ -229,19 +230,29 @@ def resource_get_all(context): def resource_purge_deleted(context, stack_id): filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} - query = context.session.query(models.Resource.id) + query = context.session.query(models.Resource) result = query.filter_by(**filters) - result.delete() + attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None] + with context.session.begin(subtransactions=True): + result.delete() + if attr_ids: + context.session.query(models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) + + +def _add_atomic_key_to_values(values, atomic_key): + if atomic_key is None: + values['atomic_key'] = 1 + else: + values['atomic_key'] = atomic_key + 1 def resource_update(context, resource_id, values, atomic_key, expected_engine_id=None): session = context.session with session.begin(subtransactions=True): - if atomic_key is None: - values['atomic_key'] = 1 - else: - values['atomic_key'] = atomic_key + 1 + _add_atomic_key_to_values(values, atomic_key) rows_updated = session.query(models.Resource).filter_by( id=resource_id, engine_id=expected_engine_id, atomic_key=atomic_key).update(values) @@ -260,6 +271,48 @@ def resource_delete(context, resource_id): resource = session.query(models.Resource).get(resource_id) if resource: session.delete(resource) + if resource.attr_data_id is not None: + attr_prop_data = session.query( + models.ResourcePropertiesData).get(resource.attr_data_id) + session.delete(attr_prop_data) + + +def resource_attr_id_set(context, resource_id, atomic_key, attr_id): + session = context.session + with session.begin(subtransactions=True): + values = {'attr_data_id': attr_id} + _add_atomic_key_to_values(values, atomic_key) + rows_updated = session.query(models.Resource).filter(and_( + models.Resource.id == resource_id, + models.Resource.atomic_key == atomic_key, + models.Resource.engine_id.is_(None), + or_(models.Resource.attr_data_id == attr_id, + models.Resource.attr_data_id.is_(None)))).update( + values) + if rows_updated > 0: + return True + else: + # Someone else set the attr_id first and/or we have a stale + # view of the resource based on atomic_key, so delete the + # resource_properties_data (attr) db row. + LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', + {'rid': resource_id, 'aid': attr_id}) + session.query( + models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.attr_id == attr_id).delete() + return False + + +def resource_attr_data_delete(context, resource_id, attr_id): + session = context.session + with session.begin(subtransactions=True): + resource = session.query(models.Resource).get(resource_id) + attr_prop_data = session.query( + models.ResourcePropertiesData).get(attr_id) + if resource: + resource.update({'attr_data_id': None}) + if attr_prop_data: + session.delete(attr_prop_data) def resource_data_get_all(context, resource_id, data=None): @@ -432,13 +485,21 @@ def engine_get_all_locked_by_stack(context, stack_id): return set(i[0] for i in query.all()) -def resource_prop_data_create(context, values): - obj_ref = models.ResourcePropertiesData() +def resource_prop_data_create_or_update(context, values, rpd_id=None): + if rpd_id is None: + obj_ref = models.ResourcePropertiesData() + else: + obj_ref = context.session.query( + models.ResourcePropertiesData).filter_by(id=rpd_id).first() obj_ref.update(values) obj_ref.save(context.session) return obj_ref +def resource_prop_data_create(context, values): + return resource_prop_data_create_or_update(context, values) + + def resource_prop_data_get(context, resource_prop_data_id): result = context.session.query(models.ResourcePropertiesData).get( resource_prop_data_id) @@ -694,8 +755,17 @@ def stack_delete(context, stack_id): 'msg': 'that does not exist'}) session = context.session with session.begin(): + attr_ids = [] + # normally the resources are deleted already by this point for r in s.resources: + if r.attr_data_id is not None: + attr_ids.append(r.attr_data_id) session.delete(r) + if attr_ids: + session.query( + models.ResourcePropertiesData.id).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) delete_softly(context, s) @@ -1325,6 +1395,11 @@ def _purge_stacks(stack_infos, engine, meta): resource.c.stack_id.in_(stack_ids)) rsrc_prop_data_ids = set( [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.attr_data_id]).where( + resource.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids.update( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) rsrc_prop_data_where = sqlalchemy.select( [event.c.rsrc_prop_data_id]).where( event.c.stack_id.in_(stack_ids)) diff --git a/heat/engine/attributes.py b/heat/engine/attributes.py index 6e2a4099bc..cfa92e676f 100644 --- a/heat/engine/attributes.py +++ b/heat/engine/attributes.py @@ -153,6 +153,10 @@ class Attributes(collections.Mapping): "Invalid attribute name '%s'" % ALL_ATTRIBUTES def reset_resolved_values(self): + if hasattr(self, '_resolved_values'): + self._has_new_resolved = len(self._resolved_values) > 0 + else: + self._has_new_resolved = False self._resolved_values = {} @staticmethod @@ -223,6 +227,29 @@ class Attributes(collections.Mapping): {'name': attrib.name, 'att_type': attrib.schema.BOOLEAN}) + @property + def cached_attrs(self): + # do not return an empty dict + if self._resolved_values: + return self._resolved_values + return None + + @cached_attrs.setter + def cached_attrs(self, c_attrs): + if c_attrs is None: + self._resolved_values = {} + else: + self._resolved_values = c_attrs + self._has_new_resolved = False + + def has_new_cached_attrs(self): + """Returns True if cached_attrs have changed + + Allows the caller to determine if this instance's cached_attrs + have been updated since they were initially set (if at all). + """ + return self._has_new_resolved + def __getitem__(self, key): if key not in self: raise KeyError(_('%(resource)s: Invalid attribute %(key)s') % @@ -242,6 +269,7 @@ class Attributes(collections.Mapping): self._validate_type(attrib, value) # only store if not None, it may resolve to an actual value # on subsequent calls + self._has_new_resolved = True self._resolved_values[key] = value return value diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 999a78b5cd..bad5ce6295 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -232,7 +232,13 @@ class CheckResource(object): cnxt, self._rpc_client, req, current_traversal, set(graph[(req, fwd)]), graph_key, input_data, fwd, stack.adopt_stack_data) - + if is_update: + if input_forward_data is None: + # we haven't resolved attribute data for the resource, + # so clear any old attributes so they may be re-resolved + rsrc.clear_stored_attributes() + else: + rsrc.store_attributes() check_stack_complete(cnxt, stack, current_traversal, graph_key[0], deps, graph_key[1]) except exception.EntityNotFound as e: diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 46849cec74..b6318bb55a 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -244,6 +244,7 @@ class Resource(status.ResourceStatus): self.id = None self.uuid = None self._data = None + self._attr_data_id = None self._rsrc_metadata = None self._rsrc_prop_data = None self._stored_properties_data = None @@ -289,6 +290,8 @@ class Resource(status.ResourceStatus): self, resource.data) except exception.NotFound: self._data = {} + self.attributes.cached_attrs = resource.attr_data + self._attr_data_id = resource.attr_data_id self._rsrc_metadata = resource.rsrc_metadata self._stored_properties_data = resource.properties_data self._rsrc_prop_data = resource.rsrc_prop_data @@ -921,6 +924,7 @@ class Resource(status.ResourceStatus): self._stored_properties_data = function.resolve(self.properties.data) if self._stored_properties_data != old_props: self._rsrc_prop_data = None + self.attributes.reset_resolved_values() def node_data(self): def get_attrs(attrs): @@ -2092,13 +2096,38 @@ class Resource(status.ResourceStatus): if new_state != old_state: self._add_event(action, status, reason) - self.attributes.reset_resolved_values() + if status != self.COMPLETE: + self.clear_stored_attributes() @property def state(self): """Returns state, tuple of action, status.""" return (self.action, self.status) + def store_attributes(self): + assert self.id is not None + if self.status != self.COMPLETE or self.action in (self.INIT, + self.DELETE): + return + if not self.attributes.has_new_cached_attrs(): + return + + try: + attr_data_id = resource_objects.Resource.store_attributes( + self.context, self.id, self._atomic_key, + self.attributes.cached_attrs, self._attr_data_id) + if attr_data_id is not None: + self._attr_data_id = attr_data_id + except Exception as ex: + LOG.error('store_attributes rsrc %(name)s %(id)s DB error %(ex)s', + {'name': self.name, 'id': self.id, 'ex': ex}) + + def clear_stored_attributes(self): + if self._attr_data_id: + resource_objects.Resource.attr_data_delete( + self.context, self.id, self._attr_data_id) + self.attributes.reset_resolved_values() + def get_reference_id(self): """Default implementation for function get_resource. diff --git a/heat/engine/resources/aws/autoscaling/autoscaling_group.py b/heat/engine/resources/aws/autoscaling/autoscaling_group.py index 89f70aeb0f..4a41e5d949 100644 --- a/heat/engine/resources/aws/autoscaling/autoscaling_group.py +++ b/heat/engine/resources/aws/autoscaling/autoscaling_group.py @@ -249,6 +249,13 @@ class AutoScalingGroup(cooldown.CooldownMixin, instgrp.InstanceGroup): min_adjustment_step, lower, upper) + def resize(self, capacity): + try: + super(AutoScalingGroup, self).resize(capacity) + finally: + # allow InstanceList to be re-resolved + self.clear_stored_attributes() + def handle_update(self, json_snippet, tmpl_diff, prop_diff): """Updates self.properties, if Properties has changed. diff --git a/heat/engine/resources/openstack/neutron/lbaas/pool.py b/heat/engine/resources/openstack/neutron/lbaas/pool.py index 9d09ae8a99..eb3d22a13c 100644 --- a/heat/engine/resources/openstack/neutron/lbaas/pool.py +++ b/heat/engine/resources/openstack/neutron/lbaas/pool.py @@ -148,6 +148,7 @@ class Pool(neutron.NeutronResource): ), MEMBERS_ATTR: attributes.Schema( _('Members associated with this pool.'), + cache_mode=attributes.Schema.CACHE_NONE, type=attributes.Schema.LIST ), } diff --git a/heat/engine/service.py b/heat/engine/service.py index a2fc9d6bdc..c09e215f1c 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -526,8 +526,21 @@ class EngineService(service.ServiceBase): else: stacks = parser.Stack.load_all(cnxt) - return [api.format_stack( + retval = [api.format_stack( stack, resolve_outputs=resolve_outputs) for stack in stacks] + if resolve_outputs: + # Cases where stored attributes may not exist for a resource: + # * For those resources that have attributes that were + # *not* referenced by other resources, their attributes + # are not resolved/stored over a stack update traversal + # * The resource is an AutoScalingGroup that received a signal + # * Near simultaneous updates (say by an update and a signal) + # * The first time resolving a pre-Pike stack + for stack in stacks: + if stack.convergence: + for res in six.itervalues(stack.resources): + res.store_attributes() + return retval def get_revision(self, cnxt): return cfg.CONF.revision['heat_revision'] diff --git a/heat/objects/resource.py b/heat/objects/resource.py index e656a26e44..102aa45c76 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -111,7 +111,7 @@ class Resource( resource['data'] = [resource_data.ResourceData._from_db_object( resource_data.ResourceData(context), resd ) for resd in db_resource.data] - else: + elif field != 'attr_data': resource[field] = db_resource[field] if db_resource['rsrc_prop_data'] is not None: @@ -139,15 +139,20 @@ class Resource( resource._properties_data = {} if db_resource['attr_data'] is not None: - resource['attr_data'] = \ - rpd.ResourcePropertiesData._from_db_object( - rpd.ResourcePropertiesData(context), context, - db_resource['attr_data']) + resource._attr_data = rpd.ResourcePropertiesData._from_db_object( + rpd.ResourcePropertiesData(context), context, + db_resource['attr_data']).data + else: + resource._attr_data = None resource._context = context resource.obj_reset_changes() return resource + @property + def attr_data(self): + return self._attr_data + @property def properties_data(self): return self._properties_data @@ -185,6 +190,10 @@ class Resource( def delete(cls, context, resource_id): db_api.resource_delete(context, resource_id) + @classmethod + def attr_data_delete(cls, context, resource_id, attr_id): + db_api.resource_attr_data_delete(context, resource_id, attr_id) + @classmethod def exchange_stacks(cls, context, resource_id1, resource_id2): return db_api.resource_exchange_stacks( @@ -278,6 +287,16 @@ class Resource( atomic_key=atomic_key, expected_engine_id=expected_engine_id) + @classmethod + def store_attributes(cls, context, resource_id, atomic_key, + attr_data, attr_id): + attr_id = rpd.ResourcePropertiesData.create_or_update( + context, attr_data, attr_id).id + if db_api.resource_attr_id_set( + context, resource_id, atomic_key, attr_id): + return attr_id + return None + def refresh(self): resource_db = db_api.resource_get(self._context, self.id, refresh=True) return self.__class__._from_db_object( diff --git a/heat/objects/resource_properties_data.py b/heat/objects/resource_properties_data.py index c5670aa6ed..c1c313c259 100644 --- a/heat/objects/resource_properties_data.py +++ b/heat/objects/resource_properties_data.py @@ -55,14 +55,19 @@ class ResourcePropertiesData( return rpd @classmethod - def create(cls, context, data): + def create_or_update(cls, context, data, rpd_id=None): properties_data_encrypted, properties_data = \ ResourcePropertiesData.encrypt_properties_data(data) values = {'encrypted': properties_data_encrypted, 'data': properties_data} - db_obj = db_api.resource_prop_data_create(context, values) + db_obj = db_api.resource_prop_data_create_or_update( + context, values, rpd_id) return cls._from_db_object(cls(), context, db_obj, data) + @classmethod + def create(cls, context, data): + return ResourcePropertiesData.create_or_update(context, data) + @staticmethod def encrypt_properties_data(data): if cfg.CONF.encrypt_parameters_and_properties and data: diff --git a/heat/tests/test_event.py b/heat/tests/test_event.py index 341ae2b988..40f598ac3c 100644 --- a/heat/tests/test_event.py +++ b/heat/tests/test_event.py @@ -252,7 +252,8 @@ class EventTest(EventCommon): cfg.CONF.set_override('encrypt_parameters_and_properties', True) data = {'p1': 'hello', 'p2': 'too soon?'} - rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx, data) + rpd_obj = rpd_object.ResourcePropertiesData().create_or_update( + self.ctx, data) e_obj = event_object.Event().create( self.ctx, {'stack_id': self.stack.id, diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index 2a2e59298a..b6eca48359 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -609,11 +609,57 @@ class ResourceTest(common.HeatTestCase): self.assertEqual(res_obj.status, res.COMPLETE) self.assertRaises(AttributeError, getattr, res_obj, 'action') + def test_attributes_store(self): + res_def = rsrc_defn.ResourceDefinition('test_resource', + 'ResWithStringPropAndAttr') + res = generic_rsrc.ResWithStringPropAndAttr( + 'test_res_attr_store', res_def, self.stack) + + res.action = res.CREATE + res.status = res.COMPLETE + res.store() + res.store_attributes() + # attr was not resolved, cache was not warmed, nothing to store + self.assertIsNone(res._attr_data_id) + + with mock.patch.object(res, '_resolve_attribute') as res_attr: + attr_val = '0123 four' + res_attr.return_value = attr_val + res.attributes['string'] + + # attr cache is warmed, now store_attributes persists something + res.store_attributes() + self.assertIsNotNone(res._attr_data_id) + + # verify the attribute rpd obj that was stored matches + self.assertEqual({'string': attr_val}, + rpd_object.ResourcePropertiesData.get_by_id( + res.context, res._attr_data_id).data) + + def test_attributes_load_stored(self): + res_def = rsrc_defn.ResourceDefinition('test_resource', + 'ResWithStringPropAndAttr') + res = generic_rsrc.ResWithStringPropAndAttr( + 'test_res_attr_store', res_def, self.stack) + + res.action = res.UPDATE + res.status = res.COMPLETE + res.store() + attr_data = {'string': 'word'} + resource_objects.Resource.store_attributes( + res.context, res.id, res._atomic_key, attr_data, None) + res._load_data(resource_objects.Resource.get_obj( + res.context, res.id)) + with mock.patch.object(res, '_resolve_attribute') as res_attr: + self.assertEqual(attr_data, res.attributes._resolved_values) + self.assertEqual('word', res.attributes['string']) + self.assertEqual(0, res_attr.call_count) + def test_resource_object_resource_properties_data(self): cfg.CONF.set_override('encrypt_parameters_and_properties', True) data = {'p1': 'i see', 'p2': 'good times, good times'} - rpd_obj = rpd_object.ResourcePropertiesData().create( + rpd_obj = rpd_object.ResourcePropertiesData().create_or_update( self.stack.context, data) rpd_db_obj = self.stack.context.session.query( models.ResourcePropertiesData).get(rpd_obj.id) diff --git a/heat/tests/test_resource_properties_data.py b/heat/tests/test_resource_properties_data.py index a0e37c7324..2ef5374cf1 100644 --- a/heat/tests/test_resource_properties_data.py +++ b/heat/tests/test_resource_properties_data.py @@ -31,8 +31,8 @@ class ResourcePropertiesDataTest(common.HeatTestCase): 'prop5': True} def _get_rpd_and_db_obj(self): - rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx, - self.data) + rpd_obj = rpd_object.ResourcePropertiesData().create_or_update( + self.ctx, self.data) db_obj = self.ctx.session.query( models.ResourcePropertiesData).get(rpd_obj.id) self.assertEqual(len(self.data), len(db_obj['data']))