From 04c8faba495b3d0d8ad0c8a1fa174b0ef1357b8d Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Mon, 19 Jun 2023 11:12:25 +0100 Subject: [PATCH] db: Removing aliasing of context.session This will make a future change easier to grok. Signed-off-by: Stephen Finucane Change-Id: I0eefa245e827f951fc2333f64670f19100cbcc8a --- heat/db/api.py | 221 +++++++++++++++++++++++-------------------------- 1 file changed, 103 insertions(+), 118 deletions(-) diff --git a/heat/db/api.py b/heat/db/api.py index f4484516fa..feec7d0530 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -180,14 +180,13 @@ def raw_template_delete(context, template_id): # Ignore not found return raw_tmpl_files_id = raw_template.files_id - session = context.session - with session.begin(subtransactions=True): - session.delete(raw_template) + with context.session.begin(subtransactions=True): + context.session.delete(raw_template) if raw_tmpl_files_id is None: return # If no other raw_template is referencing the same raw_template_files, # delete that too - if session.query(models.RawTemplate).filter_by( + if context.session.query(models.RawTemplate).filter_by( files_id=raw_tmpl_files_id).first() is None: try: raw_tmpl_files = raw_template_files_get( @@ -195,15 +194,14 @@ def raw_template_delete(context, template_id): except exception.NotFound: # Ignore not found return - session.delete(raw_tmpl_files) + context.session.delete(raw_tmpl_files) def raw_template_files_create(context, values): - session = context.session raw_templ_files_ref = models.RawTemplateFiles() raw_templ_files_ref.update(values) - with session.begin(): - raw_templ_files_ref.save(session) + with context.session.begin(): + raw_templ_files_ref.save(context.session) return raw_templ_files_ref @@ -307,10 +305,9 @@ def resource_update(context, resource_id, values, atomic_key, def _try_resource_update(context, resource_id, values, atomic_key, expected_engine_id=None): - session = context.session - with session.begin(subtransactions=True): + with context.session.begin(subtransactions=True): _add_atomic_key_to_values(values, atomic_key) - rows_updated = session.query(models.Resource).filter_by( + rows_updated = context.session.query(models.Resource).filter_by( id=resource_id, engine_id=expected_engine_id, atomic_key=atomic_key).update(values) @@ -323,23 +320,21 @@ def resource_update_and_save(context, resource_id, values): def resource_delete(context, resource_id): - session = context.session - with session.begin(subtransactions=True): - resource = session.get(models.Resource, resource_id) + with context.session.begin(subtransactions=True): + resource = context.session.get(models.Resource, resource_id) if resource: - session.delete(resource) + context.session.delete(resource) if resource.attr_data_id is not None: - attr_prop_data = session.get( + attr_prop_data = context.session.get( models.ResourcePropertiesData, resource.attr_data_id) - session.delete(attr_prop_data) + context.session.delete(attr_prop_data) def resource_attr_id_set(context, resource_id, atomic_key, attr_id): - session = context.session - with session.begin(): + with context.session.begin(): values = {'attr_data_id': attr_id} _add_atomic_key_to_values(values, atomic_key) - rows_updated = session.query(models.Resource).filter(and_( + rows_updated = context.session.query(models.Resource).filter(and_( models.Resource.id == resource_id, models.Resource.atomic_key == atomic_key, models.Resource.engine_id.is_(None), @@ -354,22 +349,21 @@ def resource_attr_id_set(context, resource_id, atomic_key, attr_id): # resource_properties_data (attr) DB row. LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', {'rid': resource_id, 'aid': attr_id}) - session.query( + context.session.query( models.ResourcePropertiesData).filter( models.ResourcePropertiesData.id == attr_id).delete() return False def resource_attr_data_delete(context, resource_id, attr_id): - session = context.session - with session.begin(): - resource = session.get(models.Resource, resource_id) - attr_prop_data = session.get( + with context.session.begin(): + resource = context.session.get(models.Resource, resource_id) + attr_prop_data = context.session.get( models.ResourcePropertiesData, attr_id) if resource: resource.update({'attr_data_id': None}) if attr_prop_data: - session.delete(attr_prop_data) + context.session.delete(attr_prop_data) def resource_data_get_all(context, resource_id, data=None): @@ -413,26 +407,24 @@ def resource_data_get(context, resource_id, key): def stack_tags_set(context, stack_id, tags): - session = context.session - with session.begin(): + with context.session.begin(): stack_tags_delete(context, stack_id) result = [] for tag in tags: stack_tag = models.StackTag() stack_tag.tag = tag stack_tag.stack_id = stack_id - stack_tag.save(session=session) + stack_tag.save(session=context.session) result.append(stack_tag) return result or None def stack_tags_delete(context, stack_id): - session = context.session - with session.begin(subtransactions=True): + with context.session.begin(subtransactions=True): result = stack_tags_get(context, stack_id) if result: for tag in result: - session.delete(tag) + context.session.delete(tag) def stack_tags_get(context, stack_id): @@ -485,9 +477,8 @@ def resource_exchange_stacks(context, resource_id1, resource_id2): def resource_data_delete(context, resource_id, key): result = resource_data_get_by_key(context, resource_id, key) - session = context.session - with session.begin(): - session.delete(result) + with context.session.begin(): + context.session.delete(result) def resource_create(context, values): @@ -502,9 +493,8 @@ def resource_create_replacement(context, existing_res_id, new_res_values, atomic_key, expected_engine_id=None): - session = context.session try: - with session.begin(): + with context.session.begin(): new_res = resource_create(context, new_res_values) update_data = {'replaced_by': new_res.id} if not _try_resource_update(context, @@ -838,9 +828,8 @@ def stack_create(context, values): @retry_on_db_error def stack_update(context, stack_id, values, exp_trvsl=None): - session = context.session - with session.begin(subtransactions=True): - query = (session.query(models.Stack) + with context.session.begin(subtransactions=True): + query = (context.session.query(models.Stack) .filter(and_(models.Stack.id == stack_id), (models.Stack.deleted_at.is_(None)))) if not context.is_admin: @@ -862,7 +851,7 @@ def stack_update(context, stack_id, values, exp_trvsl=None): '%(id)s %(msg)s') % { 'id': stack_id, 'msg': 'that does not exist'}) - session.expire_all() + context.session.expire_all() return (rows_updated is not None and rows_updated > 0) @@ -873,16 +862,15 @@ def stack_delete(context, stack_id): '%(id)s %(msg)s') % { 'id': stack_id, 'msg': 'that does not exist'}) - session = context.session - with session.begin(): + with context.session.begin(): attr_ids = [] # normally the resources are deleted already by this point for r in s.resources: if r.attr_data_id is not None: attr_ids.append(r.attr_data_id) - session.delete(r) + context.session.delete(r) if attr_ids: - session.query( + context.session.query( models.ResourcePropertiesData.id).filter( models.ResourcePropertiesData.id.in_(attr_ids)).delete( synchronize_session=False) @@ -914,17 +902,16 @@ def stack_lock_get_engine_id(context, stack_id): def persist_state_and_release_lock(context, stack_id, engine_id, values): - session = context.session - with session.begin(): - rows_updated = (session.query(models.Stack) + with context.session.begin(): + rows_updated = (context.session.query(models.Stack) .filter(models.Stack.id == stack_id) .update(values, synchronize_session=False)) rows_affected = None if rows_updated is not None and rows_updated > 0: - rows_affected = session.query( + rows_affected = context.session.query( models.StackLock ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() - session.expire_all() + context.session.expire_all() if not rows_affected: return True @@ -1149,8 +1136,7 @@ def _delete_event_rows(context, stack_id, limit): # So we must manually supply the IN() values. # pgsql SHOULD work with the pure DELETE/JOIN below but that must be # confirmed via integration tests. - session = context.session - with session.begin(): + with context.session.begin(): query = _query_all_events_by_stack(context, stack_id) query = query.order_by(models.Event.id).limit(limit) id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()] @@ -1159,7 +1145,7 @@ def _delete_event_rows(context, stack_id, limit): (ids, rsrc_prop_ids) = zip(*id_pairs) max_id = ids[-1] # delete the events - retval = session.query(models.Event).filter( + retval = context.session.query(models.Event).filter( models.Event.id <= max_id).filter( models.Event.stack_id == stack_id).delete() @@ -1167,7 +1153,7 @@ def _delete_event_rows(context, stack_id, limit): def del_rpd(rpd_ids): if not rpd_ids: return - q_rpd = session.query(models.ResourcePropertiesData) + q_rpd = context.session.query(models.ResourcePropertiesData) q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids)) q_rpd.delete(synchronize_session=False) @@ -1257,10 +1243,9 @@ def software_config_delete(context, config_id): def software_deployment_create(context, values): obj_ref = models.SoftwareDeployment() obj_ref.update(values) - session = context.session - with session.begin(): - obj_ref.save(session) + with context.session.begin(): + obj_ref.save(context.session) return obj_ref @@ -1299,9 +1284,8 @@ def software_deployment_update(context, deployment_id, values): def software_deployment_delete(context, deployment_id): deployment = software_deployment_get(context, deployment_id) - session = context.session - with session.begin(): - session.delete(deployment) + with context.session.begin(): + context.session.delete(deployment) def snapshot_create(context, values): @@ -1367,12 +1351,11 @@ def service_update(context, service_id, values): def service_delete(context, service_id, soft_delete=True): service = service_get(context, service_id) - session = context.session - with session.begin(): + with context.session.begin(): if soft_delete: delete_softly(context, service) else: - session.delete(service) + context.session.delete(service) def service_get(context, service_id): @@ -1702,17 +1685,18 @@ def _crypt_action(encrypt): def _db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + context, encryption_key, encrypt=False, batch_size=50, verbose=False, +): from heat.engine import template - session = ctxt.session excs = [] - query = session.query(models.RawTemplate) + query = context.session.query(models.RawTemplate) template_batches = _get_batch( - session, ctxt=ctxt, query=query, model=models.RawTemplate, + context.session, context=context, query=query, + model=models.RawTemplate, batch_size=batch_size) next_batch = list(itertools.islice(template_batches, batch_size)) while next_batch: - with session.begin(): + with context.session.begin(): for raw_template in next_batch: try: if verbose: @@ -1732,7 +1716,7 @@ def _db_encrypt_or_decrypt_template_params( if encrypt: tmpl = template.Template.load( - ctxt, raw_template.id, raw_template) + context, raw_template.id, raw_template) param_schemata = tmpl.param_schemata() if not param_schemata: continue @@ -1762,7 +1746,7 @@ def _db_encrypt_or_decrypt_template_params( newenv['encrypted_param_names'] = [] if needs_update: - raw_template_update(ctxt, raw_template.id, + raw_template_update(context, raw_template.id, {'environment': newenv}) except Exception as exc: LOG.exception('Failed to %(crypt_action)s parameters ' @@ -1782,20 +1766,21 @@ def _db_encrypt_or_decrypt_template_params( def _db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): - session = ctxt.session + context, encryption_key, encrypt=False, batch_size=50, verbose=False, +): excs = [] # Older resources may have properties_data in the legacy column, # so update those as needed - query = session.query(models.Resource).filter( + query = context.session.query(models.Resource).filter( models.Resource.properties_data_encrypted.isnot(encrypt)) resource_batches = _get_batch( - session=session, ctxt=ctxt, query=query, model=models.Resource, + session=context.session, context=context, query=query, + model=models.Resource, batch_size=batch_size) next_batch = list(itertools.islice(resource_batches, batch_size)) while next_batch: - with session.begin(): + with context.session.begin(): for resource in next_batch: if not resource.properties_data: continue @@ -1809,7 +1794,7 @@ def _db_encrypt_or_decrypt_resource_prop_data_legacy( else: result = crypt.decrypted_dict(resource.properties_data, encryption_key) - resource_update(ctxt, resource.id, + resource_update(context, resource.id, {'properties_data': result, 'properties_data_encrypted': encrypt}, resource.atomic_key) @@ -1829,20 +1814,20 @@ def _db_encrypt_or_decrypt_resource_prop_data_legacy( def _db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): - session = ctxt.session + context, encryption_key, encrypt=False, batch_size=50, verbose=False, +): excs = [] # Older resources may have properties_data in the legacy column, # so update those as needed - query = session.query(models.ResourcePropertiesData).filter( + query = context.session.query(models.ResourcePropertiesData).filter( models.ResourcePropertiesData.encrypted.isnot(encrypt)) rpd_batches = _get_batch( - session=session, ctxt=ctxt, query=query, + session=context.session, context=context, query=query, model=models.ResourcePropertiesData, batch_size=batch_size) next_batch = list(itertools.islice(rpd_batches, batch_size)) while next_batch: - with session.begin(): + with context.session.begin(): for rpd in next_batch: if not rpd.data: continue @@ -1875,11 +1860,12 @@ def _db_encrypt_or_decrypt_resource_prop_data( return excs -def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, - verbose=False): +def db_encrypt_parameters_and_properties( + context, encryption_key, batch_size=50, verbose=False, +): """Encrypt parameters and properties for all templates in db. - :param ctxt: RPC context + :param context: RPC context :param encryption_key: key that will be used for parameter and property encryption :param batch_size: number of templates requested from DB in each iteration. @@ -1891,19 +1877,20 @@ def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, """ excs = [] excs.extend(_db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, True, batch_size, verbose)) + context, encryption_key, True, batch_size, verbose)) excs.extend(_db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, True, batch_size, verbose)) + context, encryption_key, True, batch_size, verbose)) excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, True, batch_size, verbose)) + context, encryption_key, True, batch_size, verbose)) return excs -def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, - verbose=False): +def db_decrypt_parameters_and_properties( + context, encryption_key, batch_size=50, verbose=False, +): """Decrypt parameters and properties for all templates in db. - :param ctxt: RPC context + :param context: RPC context :param encryption_key: key that will be used for parameter and property decryption :param batch_size: number of templates requested from DB in each iteration. @@ -1915,33 +1902,32 @@ def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, """ excs = [] excs.extend(_db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, False, batch_size, verbose)) + context, encryption_key, False, batch_size, verbose)) excs.extend(_db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, False, batch_size, verbose)) + context, encryption_key, False, batch_size, verbose)) excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, False, batch_size, verbose)) + context, encryption_key, False, batch_size, verbose)) return excs -def db_properties_data_migrate(ctxt, batch_size=50): +def db_properties_data_migrate(context, batch_size=50): """Migrate properties data from legacy columns to new location in db. - :param ctxt: RPC context + :param context: RPC context :param batch_size: number of templates requested from DB in each iteration. 50 means that heat requests 50 templates, encrypt them and proceed with next 50 items. """ - session = ctxt.session - query = session.query(models.Resource).filter(and_( + query = context.session.query(models.Resource).filter(and_( models.Resource.properties_data.isnot(None), models.Resource.rsrc_prop_data_id.is_(None))) resource_batches = _get_batch( - session=session, ctxt=ctxt, query=query, + session=context.session, context=context, query=query, model=models.Resource, batch_size=batch_size) next_batch = list(itertools.islice(resource_batches, batch_size)) while next_batch: - with session.begin(): + with context.session.begin(): for resource in next_batch: try: encrypted = resource.properties_data_encrypted @@ -1953,9 +1939,9 @@ def db_properties_data_migrate(ctxt, batch_size=50): resource.id) encrypted = False rsrc_prop_data = resource_prop_data_create( - ctxt, {'encrypted': encrypted, - 'data': resource.properties_data}) - resource_update(ctxt, resource.id, + context, {'encrypted': encrypted, + 'data': resource.properties_data}) + resource_update(context, resource.id, {'properties_data_encrypted': None, 'properties_data': None, 'rsrc_prop_data_id': rsrc_prop_data.id}, @@ -1966,21 +1952,21 @@ def db_properties_data_migrate(ctxt, batch_size=50): continue next_batch = list(itertools.islice(resource_batches, batch_size)) - query = session.query(models.Event).filter(and_( + query = context.session.query(models.Event).filter(and_( models.Event.resource_properties.isnot(None), models.Event.rsrc_prop_data_id.is_(None))) event_batches = _get_batch( - session=session, ctxt=ctxt, query=query, + session=context.session, context=context, query=query, model=models.Event, batch_size=batch_size) next_batch = list(itertools.islice(event_batches, batch_size)) while next_batch: - with session.begin(): + with context.session.begin(): for event in next_batch: try: prop_data = event.resource_properties rsrc_prop_data = resource_prop_data_create( - ctxt, {'encrypted': False, - 'data': prop_data}) + context, + {'encrypted': False, 'data': prop_data}) event.update({'resource_properties': None, 'rsrc_prop_data_id': rsrc_prop_data.id}) except Exception: @@ -1990,11 +1976,11 @@ def db_properties_data_migrate(ctxt, batch_size=50): next_batch = list(itertools.islice(event_batches, batch_size)) -def _get_batch(session, ctxt, query, model, batch_size=50): +def _get_batch(session, context, query, model, batch_size=50): last_batch_marker = None while True: results = _paginate_query( - context=ctxt, query=query, model=model, limit=batch_size, + context=context, query=query, model=model, limit=batch_size, marker=last_batch_marker).all() if not results: break @@ -2005,21 +1991,20 @@ def _get_batch(session, ctxt, query, model, batch_size=50): def reset_stack_status(context, stack_id, stack=None): - session = context.session if stack is None: - stack = session.get(models.Stack, stack_id) + stack = context.session.get(models.Stack, stack_id) if stack is None: raise exception.NotFound(_('Stack with id %s not found') % stack_id) - with session.begin(): - query = session.query(models.Resource).filter_by( + with context.session.begin(): + query = context.session.query(models.Resource).filter_by( status='IN_PROGRESS', stack_id=stack_id) query.update({'status': 'FAILED', 'status_reason': 'Stack status manually reset', 'engine_id': None}) - query = session.query(models.ResourceData) + query = context.session.query(models.ResourceData) query = query.join(models.Resource) query = query.filter_by(stack_id=stack_id) query = query.filter( @@ -2027,19 +2012,19 @@ def reset_stack_status(context, stack_id, stack=None): data_ids = [data.id for data in query] if data_ids: - query = session.query(models.ResourceData) + query = context.session.query(models.ResourceData) query = query.filter(models.ResourceData.id.in_(data_ids)) query.delete(synchronize_session='fetch') - query = session.query(models.Stack).filter_by(owner_id=stack_id) + query = context.session.query(models.Stack).filter_by(owner_id=stack_id) for child in query: reset_stack_status(context, child.id, child) - with session.begin(): + with context.session.begin(): if stack.status == 'IN_PROGRESS': stack.status = 'FAILED' stack.status_reason = 'Stack status manually reset' - session.query( + context.session.query( models.StackLock ).filter_by(stack_id=stack_id).delete()