Merge "Store resource attributes in the DB"

This commit is contained in:
Jenkins 2017-06-27 09:28:07 +00:00 committed by Gerrit Code Review
commit 27a60a6931
12 changed files with 252 additions and 22 deletions

View File

@ -29,6 +29,7 @@ import six
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import orm
from sqlalchemy.orm import aliased as orm_aliased
@ -229,19 +230,29 @@ def resource_get_all(context):
def resource_purge_deleted(context, stack_id):
filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
query = context.session.query(models.Resource.id)
query = context.session.query(models.Resource)
result = query.filter_by(**filters)
result.delete()
attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None]
with context.session.begin(subtransactions=True):
result.delete()
if attr_ids:
context.session.query(models.ResourcePropertiesData).filter(
models.ResourcePropertiesData.id.in_(attr_ids)).delete(
synchronize_session=False)
def _add_atomic_key_to_values(values, atomic_key):
if atomic_key is None:
values['atomic_key'] = 1
else:
values['atomic_key'] = atomic_key + 1
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = context.session
with session.begin(subtransactions=True):
if atomic_key is None:
values['atomic_key'] = 1
else:
values['atomic_key'] = atomic_key + 1
_add_atomic_key_to_values(values, atomic_key)
rows_updated = session.query(models.Resource).filter_by(
id=resource_id, engine_id=expected_engine_id,
atomic_key=atomic_key).update(values)
@ -260,6 +271,48 @@ def resource_delete(context, resource_id):
resource = session.query(models.Resource).get(resource_id)
if resource:
session.delete(resource)
if resource.attr_data_id is not None:
attr_prop_data = session.query(
models.ResourcePropertiesData).get(resource.attr_data_id)
session.delete(attr_prop_data)
def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
session = context.session
with session.begin(subtransactions=True):
values = {'attr_data_id': attr_id}
_add_atomic_key_to_values(values, atomic_key)
rows_updated = session.query(models.Resource).filter(and_(
models.Resource.id == resource_id,
models.Resource.atomic_key == atomic_key,
models.Resource.engine_id.is_(None),
or_(models.Resource.attr_data_id == attr_id,
models.Resource.attr_data_id.is_(None)))).update(
values)
if rows_updated > 0:
return True
else:
# Someone else set the attr_id first and/or we have a stale
# view of the resource based on atomic_key, so delete the
# resource_properties_data (attr) db row.
LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s',
{'rid': resource_id, 'aid': attr_id})
session.query(
models.ResourcePropertiesData).filter(
models.ResourcePropertiesData.attr_id == attr_id).delete()
return False
def resource_attr_data_delete(context, resource_id, attr_id):
session = context.session
with session.begin(subtransactions=True):
resource = session.query(models.Resource).get(resource_id)
attr_prop_data = session.query(
models.ResourcePropertiesData).get(attr_id)
if resource:
resource.update({'attr_data_id': None})
if attr_prop_data:
session.delete(attr_prop_data)
def resource_data_get_all(context, resource_id, data=None):
@ -432,13 +485,21 @@ def engine_get_all_locked_by_stack(context, stack_id):
return set(i[0] for i in query.all())
def resource_prop_data_create(context, values):
obj_ref = models.ResourcePropertiesData()
def resource_prop_data_create_or_update(context, values, rpd_id=None):
if rpd_id is None:
obj_ref = models.ResourcePropertiesData()
else:
obj_ref = context.session.query(
models.ResourcePropertiesData).filter_by(id=rpd_id).first()
obj_ref.update(values)
obj_ref.save(context.session)
return obj_ref
def resource_prop_data_create(context, values):
return resource_prop_data_create_or_update(context, values)
def resource_prop_data_get(context, resource_prop_data_id):
result = context.session.query(models.ResourcePropertiesData).get(
resource_prop_data_id)
@ -694,8 +755,17 @@ def stack_delete(context, stack_id):
'msg': 'that does not exist'})
session = context.session
with session.begin():
attr_ids = []
# normally the resources are deleted already by this point
for r in s.resources:
if r.attr_data_id is not None:
attr_ids.append(r.attr_data_id)
session.delete(r)
if attr_ids:
session.query(
models.ResourcePropertiesData.id).filter(
models.ResourcePropertiesData.id.in_(attr_ids)).delete(
synchronize_session=False)
delete_softly(context, s)
@ -1325,6 +1395,11 @@ def _purge_stacks(stack_infos, engine, meta):
resource.c.stack_id.in_(stack_ids))
rsrc_prop_data_ids = set(
[i[0] for i in list(engine.execute(rsrc_prop_data_where))])
rsrc_prop_data_where = sqlalchemy.select(
[resource.c.attr_data_id]).where(
resource.c.stack_id.in_(stack_ids))
rsrc_prop_data_ids.update(
[i[0] for i in list(engine.execute(rsrc_prop_data_where))])
rsrc_prop_data_where = sqlalchemy.select(
[event.c.rsrc_prop_data_id]).where(
event.c.stack_id.in_(stack_ids))

View File

@ -153,6 +153,10 @@ class Attributes(collections.Mapping):
"Invalid attribute name '%s'" % ALL_ATTRIBUTES
def reset_resolved_values(self):
if hasattr(self, '_resolved_values'):
self._has_new_resolved = len(self._resolved_values) > 0
else:
self._has_new_resolved = False
self._resolved_values = {}
@staticmethod
@ -223,6 +227,29 @@ class Attributes(collections.Mapping):
{'name': attrib.name,
'att_type': attrib.schema.BOOLEAN})
@property
def cached_attrs(self):
# do not return an empty dict
if self._resolved_values:
return self._resolved_values
return None
@cached_attrs.setter
def cached_attrs(self, c_attrs):
if c_attrs is None:
self._resolved_values = {}
else:
self._resolved_values = c_attrs
self._has_new_resolved = False
def has_new_cached_attrs(self):
"""Returns True if cached_attrs have changed
Allows the caller to determine if this instance's cached_attrs
have been updated since they were initially set (if at all).
"""
return self._has_new_resolved
def __getitem__(self, key):
if key not in self:
raise KeyError(_('%(resource)s: Invalid attribute %(key)s') %
@ -242,6 +269,7 @@ class Attributes(collections.Mapping):
self._validate_type(attrib, value)
# only store if not None, it may resolve to an actual value
# on subsequent calls
self._has_new_resolved = True
self._resolved_values[key] = value
return value

View File

@ -232,7 +232,13 @@ class CheckResource(object):
cnxt, self._rpc_client, req, current_traversal,
set(graph[(req, fwd)]), graph_key, input_data, fwd,
stack.adopt_stack_data)
if is_update:
if input_forward_data is None:
# we haven't resolved attribute data for the resource,
# so clear any old attributes so they may be re-resolved
rsrc.clear_stored_attributes()
else:
rsrc.store_attributes()
check_stack_complete(cnxt, stack, current_traversal,
graph_key[0], deps, graph_key[1])
except exception.EntityNotFound as e:

View File

@ -235,6 +235,7 @@ class Resource(status.ResourceStatus):
self.id = None
self.uuid = None
self._data = None
self._attr_data_id = None
self._rsrc_metadata = None
self._rsrc_prop_data = None
self._stored_properties_data = None
@ -280,6 +281,8 @@ class Resource(status.ResourceStatus):
self, resource.data)
except exception.NotFound:
self._data = {}
self.attributes.cached_attrs = resource.attr_data
self._attr_data_id = resource.attr_data_id
self._rsrc_metadata = resource.rsrc_metadata
self._stored_properties_data = resource.properties_data
self._rsrc_prop_data = resource.rsrc_prop_data
@ -912,6 +915,7 @@ class Resource(status.ResourceStatus):
self._stored_properties_data = function.resolve(self.properties.data)
if self._stored_properties_data != old_props:
self._rsrc_prop_data = None
self.attributes.reset_resolved_values()
def node_data(self):
def get_attrs(attrs):
@ -2083,13 +2087,38 @@ class Resource(status.ResourceStatus):
if new_state != old_state:
self._add_event(action, status, reason)
self.attributes.reset_resolved_values()
if status != self.COMPLETE:
self.clear_stored_attributes()
@property
def state(self):
"""Returns state, tuple of action, status."""
return (self.action, self.status)
def store_attributes(self):
assert self.id is not None
if self.status != self.COMPLETE or self.action in (self.INIT,
self.DELETE):
return
if not self.attributes.has_new_cached_attrs():
return
try:
attr_data_id = resource_objects.Resource.store_attributes(
self.context, self.id, self._atomic_key,
self.attributes.cached_attrs, self._attr_data_id)
if attr_data_id is not None:
self._attr_data_id = attr_data_id
except Exception as ex:
LOG.error('store_attributes rsrc %(name)s %(id)s DB error %(ex)s',
{'name': self.name, 'id': self.id, 'ex': ex})
def clear_stored_attributes(self):
if self._attr_data_id:
resource_objects.Resource.attr_data_delete(
self.context, self.id, self._attr_data_id)
self.attributes.reset_resolved_values()
def get_reference_id(self):
"""Default implementation for function get_resource.

View File

@ -249,6 +249,13 @@ class AutoScalingGroup(cooldown.CooldownMixin, instgrp.InstanceGroup):
min_adjustment_step,
lower, upper)
def resize(self, capacity):
try:
super(AutoScalingGroup, self).resize(capacity)
finally:
# allow InstanceList to be re-resolved
self.clear_stored_attributes()
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
"""Updates self.properties, if Properties has changed.

View File

@ -148,6 +148,7 @@ class Pool(neutron.NeutronResource):
),
MEMBERS_ATTR: attributes.Schema(
_('Members associated with this pool.'),
cache_mode=attributes.Schema.CACHE_NONE,
type=attributes.Schema.LIST
),
}

View File

@ -526,8 +526,21 @@ class EngineService(service.ServiceBase):
else:
stacks = parser.Stack.load_all(cnxt)
return [api.format_stack(
retval = [api.format_stack(
stack, resolve_outputs=resolve_outputs) for stack in stacks]
if resolve_outputs:
# Cases where stored attributes may not exist for a resource:
# * For those resources that have attributes that were
# *not* referenced by other resources, their attributes
# are not resolved/stored over a stack update traversal
# * The resource is an AutoScalingGroup that received a signal
# * Near simultaneous updates (say by an update and a signal)
# * The first time resolving a pre-Pike stack
for stack in stacks:
if stack.convergence:
for res in six.itervalues(stack.resources):
res.store_attributes()
return retval
def get_revision(self, cnxt):
return cfg.CONF.revision['heat_revision']

View File

@ -111,7 +111,7 @@ class Resource(
resource['data'] = [resource_data.ResourceData._from_db_object(
resource_data.ResourceData(context), resd
) for resd in db_resource.data]
else:
elif field != 'attr_data':
resource[field] = db_resource[field]
if db_resource['rsrc_prop_data'] is not None:
@ -139,15 +139,20 @@ class Resource(
resource._properties_data = {}
if db_resource['attr_data'] is not None:
resource['attr_data'] = \
rpd.ResourcePropertiesData._from_db_object(
rpd.ResourcePropertiesData(context), context,
db_resource['attr_data'])
resource._attr_data = rpd.ResourcePropertiesData._from_db_object(
rpd.ResourcePropertiesData(context), context,
db_resource['attr_data']).data
else:
resource._attr_data = None
resource._context = context
resource.obj_reset_changes()
return resource
@property
def attr_data(self):
return self._attr_data
@property
def properties_data(self):
return self._properties_data
@ -185,6 +190,10 @@ class Resource(
def delete(cls, context, resource_id):
db_api.resource_delete(context, resource_id)
@classmethod
def attr_data_delete(cls, context, resource_id, attr_id):
db_api.resource_attr_data_delete(context, resource_id, attr_id)
@classmethod
def exchange_stacks(cls, context, resource_id1, resource_id2):
return db_api.resource_exchange_stacks(
@ -278,6 +287,16 @@ class Resource(
atomic_key=atomic_key,
expected_engine_id=expected_engine_id)
@classmethod
def store_attributes(cls, context, resource_id, atomic_key,
attr_data, attr_id):
attr_id = rpd.ResourcePropertiesData.create_or_update(
context, attr_data, attr_id).id
if db_api.resource_attr_id_set(
context, resource_id, atomic_key, attr_id):
return attr_id
return None
def refresh(self):
resource_db = db_api.resource_get(self._context, self.id, refresh=True)
return self.__class__._from_db_object(

View File

@ -55,14 +55,19 @@ class ResourcePropertiesData(
return rpd
@classmethod
def create(cls, context, data):
def create_or_update(cls, context, data, rpd_id=None):
properties_data_encrypted, properties_data = \
ResourcePropertiesData.encrypt_properties_data(data)
values = {'encrypted': properties_data_encrypted,
'data': properties_data}
db_obj = db_api.resource_prop_data_create(context, values)
db_obj = db_api.resource_prop_data_create_or_update(
context, values, rpd_id)
return cls._from_db_object(cls(), context, db_obj, data)
@classmethod
def create(cls, context, data):
return ResourcePropertiesData.create_or_update(context, data)
@staticmethod
def encrypt_properties_data(data):
if cfg.CONF.encrypt_parameters_and_properties and data:

View File

@ -252,7 +252,8 @@ class EventTest(EventCommon):
cfg.CONF.set_override('encrypt_parameters_and_properties', True)
data = {'p1': 'hello',
'p2': 'too soon?'}
rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx, data)
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
self.ctx, data)
e_obj = event_object.Event().create(
self.ctx,
{'stack_id': self.stack.id,

View File

@ -609,11 +609,57 @@ class ResourceTest(common.HeatTestCase):
self.assertEqual(res_obj.status, res.COMPLETE)
self.assertRaises(AttributeError, getattr, res_obj, 'action')
def test_attributes_store(self):
res_def = rsrc_defn.ResourceDefinition('test_resource',
'ResWithStringPropAndAttr')
res = generic_rsrc.ResWithStringPropAndAttr(
'test_res_attr_store', res_def, self.stack)
res.action = res.CREATE
res.status = res.COMPLETE
res.store()
res.store_attributes()
# attr was not resolved, cache was not warmed, nothing to store
self.assertIsNone(res._attr_data_id)
with mock.patch.object(res, '_resolve_attribute') as res_attr:
attr_val = '0123 four'
res_attr.return_value = attr_val
res.attributes['string']
# attr cache is warmed, now store_attributes persists something
res.store_attributes()
self.assertIsNotNone(res._attr_data_id)
# verify the attribute rpd obj that was stored matches
self.assertEqual({'string': attr_val},
rpd_object.ResourcePropertiesData.get_by_id(
res.context, res._attr_data_id).data)
def test_attributes_load_stored(self):
res_def = rsrc_defn.ResourceDefinition('test_resource',
'ResWithStringPropAndAttr')
res = generic_rsrc.ResWithStringPropAndAttr(
'test_res_attr_store', res_def, self.stack)
res.action = res.UPDATE
res.status = res.COMPLETE
res.store()
attr_data = {'string': 'word'}
resource_objects.Resource.store_attributes(
res.context, res.id, res._atomic_key, attr_data, None)
res._load_data(resource_objects.Resource.get_obj(
res.context, res.id))
with mock.patch.object(res, '_resolve_attribute') as res_attr:
self.assertEqual(attr_data, res.attributes._resolved_values)
self.assertEqual('word', res.attributes['string'])
self.assertEqual(0, res_attr.call_count)
def test_resource_object_resource_properties_data(self):
cfg.CONF.set_override('encrypt_parameters_and_properties', True)
data = {'p1': 'i see',
'p2': 'good times, good times'}
rpd_obj = rpd_object.ResourcePropertiesData().create(
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
self.stack.context, data)
rpd_db_obj = self.stack.context.session.query(
models.ResourcePropertiesData).get(rpd_obj.id)

View File

@ -31,8 +31,8 @@ class ResourcePropertiesDataTest(common.HeatTestCase):
'prop5': True}
def _get_rpd_and_db_obj(self):
rpd_obj = rpd_object.ResourcePropertiesData().create(self.ctx,
self.data)
rpd_obj = rpd_object.ResourcePropertiesData().create_or_update(
self.ctx, self.data)
db_obj = self.ctx.session.query(
models.ResourcePropertiesData).get(rpd_obj.id)
self.assertEqual(len(self.data), len(db_obj['data']))