Merge "db: Removing aliasing of context.session"

This commit is contained in:
Zuul 2023-06-22 03:46:57 +00:00 committed by Gerrit Code Review
commit 2b42bf9788
1 changed files with 103 additions and 118 deletions

View File

@ -180,14 +180,13 @@ def raw_template_delete(context, template_id):
# Ignore not found
return
raw_tmpl_files_id = raw_template.files_id
session = context.session
with session.begin(subtransactions=True):
session.delete(raw_template)
with context.session.begin(subtransactions=True):
context.session.delete(raw_template)
if raw_tmpl_files_id is None:
return
# If no other raw_template is referencing the same raw_template_files,
# delete that too
if session.query(models.RawTemplate).filter_by(
if context.session.query(models.RawTemplate).filter_by(
files_id=raw_tmpl_files_id).first() is None:
try:
raw_tmpl_files = raw_template_files_get(
@ -195,15 +194,14 @@ def raw_template_delete(context, template_id):
except exception.NotFound:
# Ignore not found
return
session.delete(raw_tmpl_files)
context.session.delete(raw_tmpl_files)
def raw_template_files_create(context, values):
session = context.session
raw_templ_files_ref = models.RawTemplateFiles()
raw_templ_files_ref.update(values)
with session.begin():
raw_templ_files_ref.save(session)
with context.session.begin():
raw_templ_files_ref.save(context.session)
return raw_templ_files_ref
@ -307,10 +305,9 @@ def resource_update(context, resource_id, values, atomic_key,
def _try_resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = context.session
with session.begin(subtransactions=True):
with context.session.begin(subtransactions=True):
_add_atomic_key_to_values(values, atomic_key)
rows_updated = session.query(models.Resource).filter_by(
rows_updated = context.session.query(models.Resource).filter_by(
id=resource_id, engine_id=expected_engine_id,
atomic_key=atomic_key).update(values)
@ -323,23 +320,21 @@ def resource_update_and_save(context, resource_id, values):
def resource_delete(context, resource_id):
session = context.session
with session.begin(subtransactions=True):
resource = session.get(models.Resource, resource_id)
with context.session.begin(subtransactions=True):
resource = context.session.get(models.Resource, resource_id)
if resource:
session.delete(resource)
context.session.delete(resource)
if resource.attr_data_id is not None:
attr_prop_data = session.get(
attr_prop_data = context.session.get(
models.ResourcePropertiesData, resource.attr_data_id)
session.delete(attr_prop_data)
context.session.delete(attr_prop_data)
def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
session = context.session
with session.begin():
with context.session.begin():
values = {'attr_data_id': attr_id}
_add_atomic_key_to_values(values, atomic_key)
rows_updated = session.query(models.Resource).filter(and_(
rows_updated = context.session.query(models.Resource).filter(and_(
models.Resource.id == resource_id,
models.Resource.atomic_key == atomic_key,
models.Resource.engine_id.is_(None),
@ -354,22 +349,21 @@ def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
# resource_properties_data (attr) DB row.
LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s',
{'rid': resource_id, 'aid': attr_id})
session.query(
context.session.query(
models.ResourcePropertiesData).filter(
models.ResourcePropertiesData.id == attr_id).delete()
return False
def resource_attr_data_delete(context, resource_id, attr_id):
session = context.session
with session.begin():
resource = session.get(models.Resource, resource_id)
attr_prop_data = session.get(
with context.session.begin():
resource = context.session.get(models.Resource, resource_id)
attr_prop_data = context.session.get(
models.ResourcePropertiesData, attr_id)
if resource:
resource.update({'attr_data_id': None})
if attr_prop_data:
session.delete(attr_prop_data)
context.session.delete(attr_prop_data)
def resource_data_get_all(context, resource_id, data=None):
@ -413,26 +407,24 @@ def resource_data_get(context, resource_id, key):
def stack_tags_set(context, stack_id, tags):
session = context.session
with session.begin():
with context.session.begin():
stack_tags_delete(context, stack_id)
result = []
for tag in tags:
stack_tag = models.StackTag()
stack_tag.tag = tag
stack_tag.stack_id = stack_id
stack_tag.save(session=session)
stack_tag.save(session=context.session)
result.append(stack_tag)
return result or None
def stack_tags_delete(context, stack_id):
session = context.session
with session.begin(subtransactions=True):
with context.session.begin(subtransactions=True):
result = stack_tags_get(context, stack_id)
if result:
for tag in result:
session.delete(tag)
context.session.delete(tag)
def stack_tags_get(context, stack_id):
@ -485,9 +477,8 @@ def resource_exchange_stacks(context, resource_id1, resource_id2):
def resource_data_delete(context, resource_id, key):
result = resource_data_get_by_key(context, resource_id, key)
session = context.session
with session.begin():
session.delete(result)
with context.session.begin():
context.session.delete(result)
def resource_create(context, values):
@ -502,9 +493,8 @@ def resource_create_replacement(context,
existing_res_id,
new_res_values,
atomic_key, expected_engine_id=None):
session = context.session
try:
with session.begin():
with context.session.begin():
new_res = resource_create(context, new_res_values)
update_data = {'replaced_by': new_res.id}
if not _try_resource_update(context,
@ -838,9 +828,8 @@ def stack_create(context, values):
@retry_on_db_error
def stack_update(context, stack_id, values, exp_trvsl=None):
session = context.session
with session.begin(subtransactions=True):
query = (session.query(models.Stack)
with context.session.begin(subtransactions=True):
query = (context.session.query(models.Stack)
.filter(and_(models.Stack.id == stack_id),
(models.Stack.deleted_at.is_(None))))
if not context.is_admin:
@ -862,7 +851,7 @@ def stack_update(context, stack_id, values, exp_trvsl=None):
'%(id)s %(msg)s') % {
'id': stack_id,
'msg': 'that does not exist'})
session.expire_all()
context.session.expire_all()
return (rows_updated is not None and rows_updated > 0)
@ -873,16 +862,15 @@ def stack_delete(context, stack_id):
'%(id)s %(msg)s') % {
'id': stack_id,
'msg': 'that does not exist'})
session = context.session
with session.begin():
with context.session.begin():
attr_ids = []
# normally the resources are deleted already by this point
for r in s.resources:
if r.attr_data_id is not None:
attr_ids.append(r.attr_data_id)
session.delete(r)
context.session.delete(r)
if attr_ids:
session.query(
context.session.query(
models.ResourcePropertiesData.id).filter(
models.ResourcePropertiesData.id.in_(attr_ids)).delete(
synchronize_session=False)
@ -914,17 +902,16 @@ def stack_lock_get_engine_id(context, stack_id):
def persist_state_and_release_lock(context, stack_id, engine_id, values):
session = context.session
with session.begin():
rows_updated = (session.query(models.Stack)
with context.session.begin():
rows_updated = (context.session.query(models.Stack)
.filter(models.Stack.id == stack_id)
.update(values, synchronize_session=False))
rows_affected = None
if rows_updated is not None and rows_updated > 0:
rows_affected = session.query(
rows_affected = context.session.query(
models.StackLock
).filter_by(stack_id=stack_id, engine_id=engine_id).delete()
session.expire_all()
context.session.expire_all()
if not rows_affected:
return True
@ -1149,8 +1136,7 @@ def _delete_event_rows(context, stack_id, limit):
# So we must manually supply the IN() values.
# pgsql SHOULD work with the pure DELETE/JOIN below but that must be
# confirmed via integration tests.
session = context.session
with session.begin():
with context.session.begin():
query = _query_all_events_by_stack(context, stack_id)
query = query.order_by(models.Event.id).limit(limit)
id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()]
@ -1159,7 +1145,7 @@ def _delete_event_rows(context, stack_id, limit):
(ids, rsrc_prop_ids) = zip(*id_pairs)
max_id = ids[-1]
# delete the events
retval = session.query(models.Event).filter(
retval = context.session.query(models.Event).filter(
models.Event.id <= max_id).filter(
models.Event.stack_id == stack_id).delete()
@ -1167,7 +1153,7 @@ def _delete_event_rows(context, stack_id, limit):
def del_rpd(rpd_ids):
if not rpd_ids:
return
q_rpd = session.query(models.ResourcePropertiesData)
q_rpd = context.session.query(models.ResourcePropertiesData)
q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids))
q_rpd.delete(synchronize_session=False)
@ -1257,10 +1243,9 @@ def software_config_delete(context, config_id):
def software_deployment_create(context, values):
obj_ref = models.SoftwareDeployment()
obj_ref.update(values)
session = context.session
with session.begin():
obj_ref.save(session)
with context.session.begin():
obj_ref.save(context.session)
return obj_ref
@ -1299,9 +1284,8 @@ def software_deployment_update(context, deployment_id, values):
def software_deployment_delete(context, deployment_id):
deployment = software_deployment_get(context, deployment_id)
session = context.session
with session.begin():
session.delete(deployment)
with context.session.begin():
context.session.delete(deployment)
def snapshot_create(context, values):
@ -1367,12 +1351,11 @@ def service_update(context, service_id, values):
def service_delete(context, service_id, soft_delete=True):
service = service_get(context, service_id)
session = context.session
with session.begin():
with context.session.begin():
if soft_delete:
delete_softly(context, service)
else:
session.delete(service)
context.session.delete(service)
def service_get(context, service_id):
@ -1702,17 +1685,18 @@ def _crypt_action(encrypt):
def _db_encrypt_or_decrypt_template_params(
ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False):
context, encryption_key, encrypt=False, batch_size=50, verbose=False,
):
from heat.engine import template
session = ctxt.session
excs = []
query = session.query(models.RawTemplate)
query = context.session.query(models.RawTemplate)
template_batches = _get_batch(
session, ctxt=ctxt, query=query, model=models.RawTemplate,
context.session, context=context, query=query,
model=models.RawTemplate,
batch_size=batch_size)
next_batch = list(itertools.islice(template_batches, batch_size))
while next_batch:
with session.begin():
with context.session.begin():
for raw_template in next_batch:
try:
if verbose:
@ -1732,7 +1716,7 @@ def _db_encrypt_or_decrypt_template_params(
if encrypt:
tmpl = template.Template.load(
ctxt, raw_template.id, raw_template)
context, raw_template.id, raw_template)
param_schemata = tmpl.param_schemata()
if not param_schemata:
continue
@ -1762,7 +1746,7 @@ def _db_encrypt_or_decrypt_template_params(
newenv['encrypted_param_names'] = []
if needs_update:
raw_template_update(ctxt, raw_template.id,
raw_template_update(context, raw_template.id,
{'environment': newenv})
except Exception as exc:
LOG.exception('Failed to %(crypt_action)s parameters '
@ -1782,20 +1766,21 @@ def _db_encrypt_or_decrypt_template_params(
def _db_encrypt_or_decrypt_resource_prop_data_legacy(
ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False):
session = ctxt.session
context, encryption_key, encrypt=False, batch_size=50, verbose=False,
):
excs = []
# Older resources may have properties_data in the legacy column,
# so update those as needed
query = session.query(models.Resource).filter(
query = context.session.query(models.Resource).filter(
models.Resource.properties_data_encrypted.isnot(encrypt))
resource_batches = _get_batch(
session=session, ctxt=ctxt, query=query, model=models.Resource,
session=context.session, context=context, query=query,
model=models.Resource,
batch_size=batch_size)
next_batch = list(itertools.islice(resource_batches, batch_size))
while next_batch:
with session.begin():
with context.session.begin():
for resource in next_batch:
if not resource.properties_data:
continue
@ -1809,7 +1794,7 @@ def _db_encrypt_or_decrypt_resource_prop_data_legacy(
else:
result = crypt.decrypted_dict(resource.properties_data,
encryption_key)
resource_update(ctxt, resource.id,
resource_update(context, resource.id,
{'properties_data': result,
'properties_data_encrypted': encrypt},
resource.atomic_key)
@ -1829,20 +1814,20 @@ def _db_encrypt_or_decrypt_resource_prop_data_legacy(
def _db_encrypt_or_decrypt_resource_prop_data(
ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False):
session = ctxt.session
context, encryption_key, encrypt=False, batch_size=50, verbose=False,
):
excs = []
# Older resources may have properties_data in the legacy column,
# so update those as needed
query = session.query(models.ResourcePropertiesData).filter(
query = context.session.query(models.ResourcePropertiesData).filter(
models.ResourcePropertiesData.encrypted.isnot(encrypt))
rpd_batches = _get_batch(
session=session, ctxt=ctxt, query=query,
session=context.session, context=context, query=query,
model=models.ResourcePropertiesData, batch_size=batch_size)
next_batch = list(itertools.islice(rpd_batches, batch_size))
while next_batch:
with session.begin():
with context.session.begin():
for rpd in next_batch:
if not rpd.data:
continue
@ -1875,11 +1860,12 @@ def _db_encrypt_or_decrypt_resource_prop_data(
return excs
def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50,
verbose=False):
def db_encrypt_parameters_and_properties(
context, encryption_key, batch_size=50, verbose=False,
):
"""Encrypt parameters and properties for all templates in db.
:param ctxt: RPC context
:param context: RPC context
:param encryption_key: key that will be used for parameter and property
encryption
:param batch_size: number of templates requested from DB in each iteration.
@ -1891,19 +1877,20 @@ def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50,
"""
excs = []
excs.extend(_db_encrypt_or_decrypt_template_params(
ctxt, encryption_key, True, batch_size, verbose))
context, encryption_key, True, batch_size, verbose))
excs.extend(_db_encrypt_or_decrypt_resource_prop_data(
ctxt, encryption_key, True, batch_size, verbose))
context, encryption_key, True, batch_size, verbose))
excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy(
ctxt, encryption_key, True, batch_size, verbose))
context, encryption_key, True, batch_size, verbose))
return excs
def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50,
verbose=False):
def db_decrypt_parameters_and_properties(
context, encryption_key, batch_size=50, verbose=False,
):
"""Decrypt parameters and properties for all templates in db.
:param ctxt: RPC context
:param context: RPC context
:param encryption_key: key that will be used for parameter and property
decryption
:param batch_size: number of templates requested from DB in each iteration.
@ -1915,33 +1902,32 @@ def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50,
"""
excs = []
excs.extend(_db_encrypt_or_decrypt_template_params(
ctxt, encryption_key, False, batch_size, verbose))
context, encryption_key, False, batch_size, verbose))
excs.extend(_db_encrypt_or_decrypt_resource_prop_data(
ctxt, encryption_key, False, batch_size, verbose))
context, encryption_key, False, batch_size, verbose))
excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy(
ctxt, encryption_key, False, batch_size, verbose))
context, encryption_key, False, batch_size, verbose))
return excs
def db_properties_data_migrate(ctxt, batch_size=50):
def db_properties_data_migrate(context, batch_size=50):
"""Migrate properties data from legacy columns to new location in db.
:param ctxt: RPC context
:param context: RPC context
:param batch_size: number of templates requested from DB in each iteration.
50 means that heat requests 50 templates, encrypt them
and proceed with next 50 items.
"""
session = ctxt.session
query = session.query(models.Resource).filter(and_(
query = context.session.query(models.Resource).filter(and_(
models.Resource.properties_data.isnot(None),
models.Resource.rsrc_prop_data_id.is_(None)))
resource_batches = _get_batch(
session=session, ctxt=ctxt, query=query,
session=context.session, context=context, query=query,
model=models.Resource, batch_size=batch_size)
next_batch = list(itertools.islice(resource_batches, batch_size))
while next_batch:
with session.begin():
with context.session.begin():
for resource in next_batch:
try:
encrypted = resource.properties_data_encrypted
@ -1953,9 +1939,9 @@ def db_properties_data_migrate(ctxt, batch_size=50):
resource.id)
encrypted = False
rsrc_prop_data = resource_prop_data_create(
ctxt, {'encrypted': encrypted,
'data': resource.properties_data})
resource_update(ctxt, resource.id,
context, {'encrypted': encrypted,
'data': resource.properties_data})
resource_update(context, resource.id,
{'properties_data_encrypted': None,
'properties_data': None,
'rsrc_prop_data_id': rsrc_prop_data.id},
@ -1966,21 +1952,21 @@ def db_properties_data_migrate(ctxt, batch_size=50):
continue
next_batch = list(itertools.islice(resource_batches, batch_size))
query = session.query(models.Event).filter(and_(
query = context.session.query(models.Event).filter(and_(
models.Event.resource_properties.isnot(None),
models.Event.rsrc_prop_data_id.is_(None)))
event_batches = _get_batch(
session=session, ctxt=ctxt, query=query,
session=context.session, context=context, query=query,
model=models.Event, batch_size=batch_size)
next_batch = list(itertools.islice(event_batches, batch_size))
while next_batch:
with session.begin():
with context.session.begin():
for event in next_batch:
try:
prop_data = event.resource_properties
rsrc_prop_data = resource_prop_data_create(
ctxt, {'encrypted': False,
'data': prop_data})
context,
{'encrypted': False, 'data': prop_data})
event.update({'resource_properties': None,
'rsrc_prop_data_id': rsrc_prop_data.id})
except Exception:
@ -1990,11 +1976,11 @@ def db_properties_data_migrate(ctxt, batch_size=50):
next_batch = list(itertools.islice(event_batches, batch_size))
def _get_batch(session, ctxt, query, model, batch_size=50):
def _get_batch(session, context, query, model, batch_size=50):
last_batch_marker = None
while True:
results = _paginate_query(
context=ctxt, query=query, model=model, limit=batch_size,
context=context, query=query, model=model, limit=batch_size,
marker=last_batch_marker).all()
if not results:
break
@ -2005,21 +1991,20 @@ def _get_batch(session, ctxt, query, model, batch_size=50):
def reset_stack_status(context, stack_id, stack=None):
session = context.session
if stack is None:
stack = session.get(models.Stack, stack_id)
stack = context.session.get(models.Stack, stack_id)
if stack is None:
raise exception.NotFound(_('Stack with id %s not found') % stack_id)
with session.begin():
query = session.query(models.Resource).filter_by(
with context.session.begin():
query = context.session.query(models.Resource).filter_by(
status='IN_PROGRESS', stack_id=stack_id)
query.update({'status': 'FAILED',
'status_reason': 'Stack status manually reset',
'engine_id': None})
query = session.query(models.ResourceData)
query = context.session.query(models.ResourceData)
query = query.join(models.Resource)
query = query.filter_by(stack_id=stack_id)
query = query.filter(
@ -2027,19 +2012,19 @@ def reset_stack_status(context, stack_id, stack=None):
data_ids = [data.id for data in query]
if data_ids:
query = session.query(models.ResourceData)
query = context.session.query(models.ResourceData)
query = query.filter(models.ResourceData.id.in_(data_ids))
query.delete(synchronize_session='fetch')
query = session.query(models.Stack).filter_by(owner_id=stack_id)
query = context.session.query(models.Stack).filter_by(owner_id=stack_id)
for child in query:
reset_stack_status(context, child.id, child)
with session.begin():
with context.session.begin():
if stack.status == 'IN_PROGRESS':
stack.status = 'FAILED'
stack.status_reason = 'Stack status manually reset'
session.query(
context.session.query(
models.StackLock
).filter_by(stack_id=stack_id).delete()