Store resource attributes in the DB
Store resource attributes that may be cached in the DB, saving the cost of re-resolving them later. This works for most resources, specifically those that do not override the get_attribute() method. Change-Id: I71f8aa431a60457326167b8c82adc03ca750eda6 Partial-Bug: #1660831
This commit is contained in:
parent
0c9adf8e48
commit
0550659f5f
@ -29,6 +29,7 @@ import six
|
||||
import sqlalchemy
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import aliased as orm_aliased
|
||||
|
||||
@ -229,19 +230,29 @@ def resource_get_all(context):
|
||||
|
||||
def resource_purge_deleted(context, stack_id):
|
||||
filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
|
||||
query = context.session.query(models.Resource.id)
|
||||
query = context.session.query(models.Resource)
|
||||
result = query.filter_by(**filters)
|
||||
result.delete()
|
||||
attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None]
|
||||
with context.session.begin(subtransactions=True):
|
||||
result.delete()
|
||||
if attr_ids:
|
||||
context.session.query(models.ResourcePropertiesData).filter(
|
||||
models.ResourcePropertiesData.id.in_(attr_ids)).delete(
|
||||
synchronize_session=False)
|
||||
|
||||
|
||||
def _add_atomic_key_to_values(values, atomic_key):
|
||||
if atomic_key is None:
|
||||
values['atomic_key'] = 1
|
||||
else:
|
||||
values['atomic_key'] = atomic_key + 1
|
||||
|
||||
|
||||
def resource_update(context, resource_id, values, atomic_key,
|
||||
expected_engine_id=None):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
if atomic_key is None:
|
||||
values['atomic_key'] = 1
|
||||
else:
|
||||
values['atomic_key'] = atomic_key + 1
|
||||
_add_atomic_key_to_values(values, atomic_key)
|
||||
rows_updated = session.query(models.Resource).filter_by(
|
||||
id=resource_id, engine_id=expected_engine_id,
|
||||
atomic_key=atomic_key).update(values)
|
||||
@ -260,6 +271,48 @@ def resource_delete(context, resource_id):
|
||||
resource = session.query(models.Resource).get(resource_id)
|
||||
if resource:
|
||||
session.delete(resource)
|
||||
if resource.attr_data_id is not None:
|
||||
attr_prop_data = session.query(
|
||||
models.ResourcePropertiesData).get(resource.attr_data_id)
|
||||
session.delete(attr_prop_data)
|
||||
|
||||
|
||||
def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
values = {'attr_data_id': attr_id}
|
||||
_add_atomic_key_to_values(values, atomic_key)
|
||||
rows_updated = session.query(models.Resource).filter(and_(
|
||||
models.Resource.id == resource_id,
|
||||
models.Resource.atomic_key == atomic_key,
|
||||
models.Resource.engine_id.is_(None),
|
||||
or_(models.Resource.attr_data_id == attr_id,
|
||||
models.Resource.attr_data_id.is_(None)))).update(
|
||||
values)
|
||||
if rows_updated > 0:
|
||||
return True
|
||||
else:
|
||||
# Someone else set the attr_id first and/or we have a stale
|
||||
# view of the resource based on atomic_key, so delete the
|
||||
# resource_properties_data (attr) db row.
|
||||
LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s',
|
||||
{'rid': resource_id, 'aid': attr_id})
|
||||
session.query(
|
||||
models.ResourcePropertiesData).filter(
|
||||
models.ResourcePropertiesData.attr_id == attr_id).delete()
|
||||
return False
|
||||
|
||||
|
||||
def resource_attr_data_delete(context, resource_id, attr_id):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
resource = session.query(models.Resource).get(resource_id)
|
||||
attr_prop_data = session.query(
|
||||
models.ResourcePropertiesData).get(attr_id)
|
||||
if resource:
|
||||
resource.update({'attr_data_id': None})
|
||||
if attr_prop_data:
|
||||
session.delete(attr_prop_data)
|
||||
|
||||
|
||||
def resource_data_get_all(context, resource_id, data=None):
|
||||
@ -432,13 +485,21 @@ def engine_get_all_locked_by_stack(context, stack_id):
|
||||
return set(i[0] for i in query.all())
|
||||
|
||||
|
||||
def resource_prop_data_create(context, values):
|
||||
obj_ref = models.ResourcePropertiesData()
|
||||
def resource_prop_data_create_or_update(context, values, rpd_id=None):
|
||||
if rpd_id is None:
|
||||
obj_ref = models.ResourcePropertiesData()
|
||||
else:
|
||||
obj_ref = context.session.query(
|
||||
models.ResourcePropertiesData).filter_by(id=rpd_id).first()
|
||||
obj_ref.update(values)
|
||||
obj_ref.save(context.session)
|
||||
return obj_ref
|
||||
|
||||
|
||||
def resource_prop_data_create(context, values):
|
||||
return resource_prop_data_create_or_update(context, values)
|
||||
|
||||
|
||||
def resource_prop_data_get(context, resource_prop_data_id):
|
||||
result = context.session.query(models.ResourcePropertiesData).get(
|
||||
resource_prop_data_id)
|
||||
@ -694,8 +755,17 @@ def stack_delete(context, stack_id):
|
||||
'msg': 'that does not exist'})
|
||||
session = context.session
|
||||
with session.begin():
|
||||
attr_ids = []
|
||||
# normally the resources are deleted already by this point
|
||||
for r in s.resources:
|
||||
if r.attr_data_id is not None:
|
||||
attr_ids.append(r.attr_data_id)
|
||||
session.delete(r)
|
||||
if attr_ids:
|
||||
session.query(
|
||||
models.ResourcePropertiesData.id).filter(
|
||||
models.ResourcePropertiesData.id.in_(attr_ids)).delete(
|
||||
synchronize_session=False)
|
||||
delete_softly(context, s)
|
||||
|
||||
|
||||
@ -1325,6 +1395,11 @@ def _purge_stacks(stack_infos, engine, meta):
|
||||
resource.c.stack_id.in_(stack_ids))
|
||||
rsrc_prop_data_ids = set(
|
||||
[i[0] for i in list(engine.execute(rsrc_prop_data_where))])
|
||||
rsrc_prop_data_where = sqlalchemy.select(
|
||||
[resource.c.attr_data_id]).where(
|
||||
resource.c.stack_id.in_(stack_ids))
|
||||
rsrc_prop_data_ids.update(
|
||||
[i[0] for i in list(engine.execute(rsrc_prop_data_where))])
|
||||
rsrc_prop_data_where = sqlalchemy.select(
|
||||
[event.c.rsrc_prop_data_id]).where(
|
||||
event.c.stack_id.in_(stack_ids))
|
||||
|
@ -153,6 +153,10 @@ class Attributes(collections.Mapping):
|
||||
"Invalid attribute name '%s'" % ALL_ATTRIBUTES
|
||||
|
||||
def reset_resolved_values(self):
|
||||
if hasattr(self, '_resolved_values'):
|
||||
self._has_new_resolved = len(self._resolved_values) > 0
|
||||
else:
|
||||
self._has_new_resolved = False
|
||||
self._resolved_values = {}
|
||||
|
||||
@staticmethod
|
||||
@ -223,6 +227,29 @@ class Attributes(collections.Mapping):
|
||||
{'name': attrib.name,
|
||||
'att_type': attrib.schema.BOOLEAN})
|
||||
|
||||
@property
|
||||
def cached_attrs(self):
|
||||
# do not return an empty dict
|
||||
if self._resolved_values:
|
||||
return self._resolved_values
|
||||
return None
|
||||
|
||||
@cached_attrs.setter
|
||||
def cached_attrs(self, c_attrs):
|
||||
if c_attrs is None:
|
||||
self._resolved_values = {}
|
||||
else:
|
||||
self._resolved_values = c_attrs
|
||||
self._has_new_resolved = False
|
||||
|
||||
def has_new_cached_attrs(self):
|
||||
"""Returns True if cached_attrs have changed
|
||||
|
||||
Allows the caller to determine if this instance's cached_attrs
|
||||
have been updated since they were initially set (if at all).
|
||||
"""
|
||||
return self._has_new_resolved
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key not in self:
|
||||
raise KeyError(_('%(resource)s: Invalid attribute %(key)s') %
|
||||
@ -242,6 +269,7 @@ class Attributes(collections.Mapping):
|
||||
self._validate_type(attrib, value)
|
||||
# only store if not None, it may resolve to an actual value
|
||||
# on subsequent calls
|
||||
self._has_new_resolved = True
|
||||
self._resolved_values[key] = value
|
||||
return value
|
||||
|
||||
|
@ -232,7 +232,13 @@ class CheckResource(object):
|
||||
cnxt, self._rpc_client, req, current_traversal,
|
||||
set(graph[(req, fwd)]), graph_key, input_data, fwd,
|
||||
stack.adopt_stack_data)
|
||||
|
||||
if is_update:
|
||||
if input_forward_data is None:
|
||||
# we haven't resolved attribute data for the resource,
|
||||
# so clear any old attributes so they may be re-resolved
|
||||
rsrc.clear_stored_attributes()
|
||||
else:
|
||||
rsrc.store_attributes()
|
||||
check_stack_complete(cnxt, stack, current_traversal,
|
||||
graph_key[0], deps, graph_key[1])
|
||||
except exception.EntityNotFound as e:
|
||||
|
@ -244,6 +244,7 @@ class Resource(status.ResourceStatus):
|
||||
self.id = None
|
||||
self.uuid = None
|
||||
self._data = None
|
||||
self._attr_data_id = None
|
||||
self._rsrc_metadata = None
|
||||
self._rsrc_prop_data = None
|
||||
self._stored_properties_data = None
|
||||
@ -289,6 +290,8 @@ class Resource(status.ResourceStatus):
|
||||
self, resource.data)
|
||||
except exception.NotFound:
|
||||
self._data = {}
|
||||
self.attributes.cached_attrs = resource.attr_data
|
||||
self._attr_data_id = resource.attr_data_id
|
||||
self._rsrc_metadata = resource.rsrc_metadata
|
||||
self._stored_properties_data = resource.properties_data
|
||||
self._rsrc_prop_data = resource.rsrc_prop_data
|
||||
@ -921,6 +924,7 @@ class Resource(status.ResourceStatus):
|
||||
self._stored_properties_data = function.resolve(self.properties.data)
|
||||
if self._stored_properties_data != old_props:
|
||||
self._rsrc_prop_data = None
|
||||
self.attributes.reset_resolved_values()
|
||||
|
||||
def node_data(self):
|
||||
def get_attrs(attrs):
|
||||
@ -2092,13 +2096,38 @@ class Resource(status.ResourceStatus):
|
||||
if new_state != old_state:
|
||||
self._add_event(action, status, reason)
|
||||
|
||||
self.attributes.reset_resolved_values()
|
||||
if status != self.COMPLETE:
|
||||
self.clear_stored_attributes()
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Returns state, tuple of action, status."""
|
||||
return (self.action, self.status)
|
||||
|
||||
def store_attributes(self):
|
||||
assert self.id is not None
|
||||
if self.status != self.COMPLETE or self.action in (self.INIT,
|
||||
self.DELETE):
|
||||
return
|
||||
if not self.attributes.has_new_cached_attrs():
|
||||
return
|
||||
|
||||
try:
|
||||
attr_data_id = resource_objects.Resource.store_attributes(
|
||||
self.context, self.id, self._atomic_key,
|
||||
self.attributes.cached_attrs, self._attr_data_id)
|
||||
if attr_data_id is not None:
|
||||
self._attr_data_id = attr_data_id
|
||||
except Exception as ex:
|
||||
LOG.error('store_attributes rsrc %(name)s %(id)s DB error %(ex)s',
|
||||
{'name': self.name, 'id': self.id, 'ex': ex})
|
||||
|
||||
def clear_stored_attributes(self):
|
||||
if self._attr_data_id:
|
||||
resource_objects.Resource.attr_data_delete(
|
||||
self.context, self.id, self._attr_data_id)
|
||||
self.attributes.reset_resolved_values()
|
||||
|
||||
def get_reference_id(self):
|
||||
"""Default implementation for function get_resource.
|
||||
|
||||
|
@ -249,6 +249,13 @@ class AutoScalingGroup(cooldown.CooldownMixin, instgrp.InstanceGroup):
|
||||
min_adjustment_step,
|
||||
lower, upper)
|
||||
|
||||
def resize(self, capacity):
|
||||
try:
|
||||
super(AutoScalingGroup, self).resize(capacity)
|
||||
finally:
|
||||
# allow InstanceList to be re-resolved
|
||||
self.clear_stored_attributes()
|
||||
|
||||
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
|
||||
"""Updates self.properties, if Properties has changed.
|
||||
|
||||
|
@ -148,6 +148,7 @@ class Pool(neutron.NeutronResource):
|
||||
),
|
||||
MEMBERS_ATTR: attributes.Schema(
|
||||
_('Members associated with this pool.'),
|
||||
cache_mode=attributes.Schema.CACHE_NONE,
|
||||
type=attributes.Schema.LIST
|
||||
),
|
||||
}
|
||||
|
@ -526,8 +526,21 @@ class EngineService(service.ServiceBase):
|
||||
else:
|
||||
stacks = parser.Stack.load_all(cnxt)
|
||||
|
||||
return [api.format_stack(
|
||||
retval = [api.format_stack(
|
||||
stack, resolve_outputs=resolve_outputs) for stack in stacks]
|
||||
if resolve_outputs:
|
||||
# Cases where stored attributes may not exist for a resource:
|
||||
# * For those resources that have attributes that were
|
||||
# *not* referenced by other resources, their attributes
|
||||
# are not resolved/stored over a stack update traversal
|
||||
# * The resource is an AutoScalingGroup that received a signal
|
||||
# * Near simultaneous updates (say by an update and a signal)
|
||||
# * The first time resolving a pre-Pike stack
|
||||
for stack in stacks:
|
||||
if stack.convergence:
|
||||
for res in six.itervalues(stack.resources):
|
||||
res.store_attributes()
|
||||
return retval
|
||||
|
||||
def get_revision(self, cnxt):
|
||||
return cfg.CONF.revision['heat_revision']
|
||||
|
@ -111,7 +111,7 @@ class Resource(
|
||||
resource['data'] = [resource_data.ResourceData._from_db_object(
|
||||
resource_data.ResourceData(context), resd
|
||||
) for resd in db_resource.data]
|
||||
else:
|
||||
elif field != 'attr_data':
|
||||
resource[field] = db_resource[field]
|
||||
|
||||
if db_resource['rsrc_prop_data'] is not None:
|
||||
@ -139,15 +139,20 @@ class Resource(
|
||||
resource._properties_data = {}
|
||||
|
||||
if db_resource['attr_data'] is not None:
|
||||
resource['attr_data'] = \
|
||||
rpd.ResourcePropertiesData._from_db_object(
|
||||
rpd.ResourcePropertiesData(context), context,
|
||||
db_resource['attr_data'])
|
||||
resource._attr_data = rpd.ResourcePropertiesData._from_db_object(
|
||||
rpd.ResourcePropertiesData(context), context,
|
||||
db_resource['attr_data']).data
|
||||
else:
|
||||
resource._attr_data = None
|
||||
|
||||
resource._context = context
|
||||
resource.obj_reset_changes()
|
||||
return resource
|
||||
|
||||
@property
|
||||
def attr_data(self):
|
||||
return self._attr_data
|
||||
|
||||
@property
|
||||
def properties_data(self):
|
||||
return self._properties_data
|
||||
@ -185,6 +190,10 @@ class Resource(
|
||||
def delete(cls, context, resource_id):
|
||||
db_api.resource_delete(context, resource_id)
|
||||
|
||||
@classmethod
|
||||
def attr_data_delete(cls, context, resource_id, attr_id):
|
||||
db_api.resource_attr_data_delete(context, resource_id, attr_id)
|
||||
|
||||
@classmethod
|
||||
def exchange_stacks(cls, context, resource_id1, resource_id2):
|
||||
return db_api.resource_exchange_stacks(
|
||||
@ -278,6 +287,16 @@ class Resource(
|
||||
atomic_key=atomic_key,
|
||||
expected_engine_id=expected_engine_id)
|
||||
|
||||
@classmethod
|
||||
def store_attributes(cls, context, resource_id, atomic_key,
|
||||
attr_data, attr_id):
|
||||
attr_id = rpd.ResourcePropertiesData.create_or_update(
|
||||
context, attr_data, attr_id).id
|
||||
if db_api.resource_attr_id_set(
|
||||
context, resource_id, atomic_key, attr_id):
|
||||
return attr_id
|
||||
return None
|
||||
|
||||
def refresh(self):
|
||||
resource_db = db_api.resource_get(self._context, self.id, refresh=True)
|
||||
return self.__class__._from_db_object(
|
||||
|
@ -55,14 +55,19 @@ class ResourcePropertiesData(
|
||||
return rpd
|
||||
|
||||
@classmethod
|
||||
def create(cls, context, data):
|
||||
def create_or_update(cls, context, data, rpd_id=None):
|
||||
properties_data_encrypted, properties_data = \
|
||||
ResourcePropertiesData.encrypt_properties_data(data)
|
||||
values = {'encrypted': properties_data_encrypted,
|
||||
'data': properties_data}
|
||||
db_obj = db_api.resource_prop_data_create(context, values)
|
||||
db_obj = db_api.resource_prop_data_create_or_update(
|
||||
context, values, rpd_id)
|
||||
return cls._from_db_object(cls(), context, db_obj, data)
|
||||
|
||||
@classmethod
|
||||
def create(cls, context, data):
|
||||
return ResourcePropertiesData.create_or_update(context, data)
|
||||
|
||||
@staticmethod
|
||||
def encrypt_properties_data(data):
|
||||
if cfg.CONF.encrypt_parameters_and_properties and data:
|
||||
|
@ -252,7 +252,8 @@ class EventTest(EventCommon):
|
||||
cfg.CONF.set_override('encrypt_parameters_and_properties', True)
|
||||
data = {'p1': 'hello',
|
||||
'p2': 'too soon?'}
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx, data)
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
|
||||
self.ctx, data)
|
||||
e_obj = event_object.Event().create(
|
||||
self.ctx,
|
||||
{'stack_id': self.stack.id,
|
||||
|
@ -609,11 +609,57 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.assertEqual(res_obj.status, res.COMPLETE)
|
||||
self.assertRaises(AttributeError, getattr, res_obj, 'action')
|
||||
|
||||
def test_attributes_store(self):
|
||||
res_def = rsrc_defn.ResourceDefinition('test_resource',
|
||||
'ResWithStringPropAndAttr')
|
||||
res = generic_rsrc.ResWithStringPropAndAttr(
|
||||
'test_res_attr_store', res_def, self.stack)
|
||||
|
||||
res.action = res.CREATE
|
||||
res.status = res.COMPLETE
|
||||
res.store()
|
||||
res.store_attributes()
|
||||
# attr was not resolved, cache was not warmed, nothing to store
|
||||
self.assertIsNone(res._attr_data_id)
|
||||
|
||||
with mock.patch.object(res, '_resolve_attribute') as res_attr:
|
||||
attr_val = '0123 four'
|
||||
res_attr.return_value = attr_val
|
||||
res.attributes['string']
|
||||
|
||||
# attr cache is warmed, now store_attributes persists something
|
||||
res.store_attributes()
|
||||
self.assertIsNotNone(res._attr_data_id)
|
||||
|
||||
# verify the attribute rpd obj that was stored matches
|
||||
self.assertEqual({'string': attr_val},
|
||||
rpd_object.ResourcePropertiesData.get_by_id(
|
||||
res.context, res._attr_data_id).data)
|
||||
|
||||
def test_attributes_load_stored(self):
|
||||
res_def = rsrc_defn.ResourceDefinition('test_resource',
|
||||
'ResWithStringPropAndAttr')
|
||||
res = generic_rsrc.ResWithStringPropAndAttr(
|
||||
'test_res_attr_store', res_def, self.stack)
|
||||
|
||||
res.action = res.UPDATE
|
||||
res.status = res.COMPLETE
|
||||
res.store()
|
||||
attr_data = {'string': 'word'}
|
||||
resource_objects.Resource.store_attributes(
|
||||
res.context, res.id, res._atomic_key, attr_data, None)
|
||||
res._load_data(resource_objects.Resource.get_obj(
|
||||
res.context, res.id))
|
||||
with mock.patch.object(res, '_resolve_attribute') as res_attr:
|
||||
self.assertEqual(attr_data, res.attributes._resolved_values)
|
||||
self.assertEqual('word', res.attributes['string'])
|
||||
self.assertEqual(0, res_attr.call_count)
|
||||
|
||||
def test_resource_object_resource_properties_data(self):
|
||||
cfg.CONF.set_override('encrypt_parameters_and_properties', True)
|
||||
data = {'p1': 'i see',
|
||||
'p2': 'good times, good times'}
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create(
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
|
||||
self.stack.context, data)
|
||||
rpd_db_obj = self.stack.context.session.query(
|
||||
models.ResourcePropertiesData).get(rpd_obj.id)
|
||||
|
@ -31,8 +31,8 @@ class ResourcePropertiesDataTest(common.HeatTestCase):
|
||||
'prop5': True}
|
||||
|
||||
def _get_rpd_and_db_obj(self):
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx,
|
||||
self.data)
|
||||
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
|
||||
self.ctx, self.data)
|
||||
db_obj = self.ctx.session.query(
|
||||
models.ResourcePropertiesData).get(rpd_obj.id)
|
||||
self.assertEqual(len(self.data), len(db_obj['data']))
|
||||
|
Loading…
Reference in New Issue
Block a user