db: Group related DB APIs

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: I10b57246e44d5b6d75a0514508761d8de7adea6e
This commit is contained in:
Stephen Finucane 2023-09-14 11:34:35 +01:00 committed by Takashi Kajinami
parent 6b514e29d9
commit d1124ac268
1 changed files with 224 additions and 170 deletions

View File

@ -70,6 +70,9 @@ LOG = logging.getLogger(__name__)
db_context.configure(__autocommit=True)
# utility methods
def get_facade():
global _facade
if _facade is None:
@ -146,6 +149,9 @@ def _soft_delete_aware_query(context, *args, **kwargs):
return query
# raw template
def raw_template_get(context, template_id):
result = context.session.get(models.RawTemplate, template_id)
@ -198,6 +204,9 @@ def raw_template_delete(context, template_id):
context.session.delete(raw_tmpl_files)
# raw template files
def raw_template_files_create(context, values):
raw_templ_files_ref = models.RawTemplateFiles()
raw_templ_files_ref.update(values)
@ -215,6 +224,102 @@ def raw_template_files_get(context, files_id):
return result
# resource
def resource_create(context, values):
resource_ref = models.Resource()
resource_ref.update(values)
resource_ref.save(context.session)
return resource_ref
@retry_on_db_error
def resource_create_replacement(context,
existing_res_id,
new_res_values,
atomic_key, expected_engine_id=None):
try:
with context.session.begin():
new_res = resource_create(context, new_res_values)
update_data = {'replaced_by': new_res.id}
if not _try_resource_update(context,
existing_res_id, update_data,
atomic_key,
expected_engine_id=expected_engine_id):
data = {}
if 'name' in new_res_values:
data['resource_name'] = new_res_values['name']
raise exception.UpdateInProgress(**data)
except db_exception.DBReferenceError as exc:
# New template_id no longer exists
LOG.debug('Not creating replacement resource: %s', exc)
return None
else:
return new_res
def resource_get_all_by_stack(context, stack_id, filters=None):
query = context.session.query(
models.Resource
).filter_by(
stack_id=stack_id
).options(
orm.joinedload(models.Resource.data),
orm.joinedload(models.Resource.rsrc_prop_data),
)
query = db_filters.exact_filter(query, models.Resource, filters)
results = query.all()
return dict((res.name, res) for res in results)
def resource_get_all_active_by_stack(context, stack_id):
filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
subquery = context.session.query(models.Resource.id).filter_by(**filters)
results = context.session.query(models.Resource).filter_by(
stack_id=stack_id).filter(
models.Resource.id.notin_(subquery.scalar_subquery())
).options(orm.joinedload(models.Resource.data)).all()
return dict((res.id, res) for res in results)
def resource_get_all_by_root_stack(context, stack_id, filters=None,
stack_id_only=False):
query = context.session.query(
models.Resource
).filter_by(
root_stack_id=stack_id
)
if stack_id_only:
query = query.options(
orm.load_only(models.Resource.id, models.Resource.stack_id)
)
else:
query = query.options(
orm.joinedload(models.Resource.data),
orm.joinedload(models.Resource.rsrc_prop_data),
)
query = db_filters.exact_filter(query, models.Resource, filters)
results = query.all()
return dict((res.id, res) for res in results)
def engine_get_all_locked_by_stack(context, stack_id):
query = context.session.query(
func.distinct(models.Resource.engine_id)
).filter(
models.Resource.stack_id == stack_id,
models.Resource.engine_id.isnot(None))
return set(i[0] for i in query.all())
def resource_get(context, resource_id, refresh=False, refresh_data=False,
eager=True):
options = [orm.joinedload(models.Resource.data)]
@ -331,6 +436,14 @@ def resource_delete(context, resource_id):
context.session.delete(attr_prop_data)
def resource_exchange_stacks(context, resource_id1, resource_id2):
with context.session.begin():
res1 = context.session.get(models.Resource, resource_id1)
res2 = context.session.get(models.Resource, resource_id2)
res1.stack, res2.stack = res2.stack, res1.stack
def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
with context.session.begin():
values = {'attr_data_id': attr_id}
@ -367,6 +480,9 @@ def resource_attr_data_delete(context, resource_id, attr_id):
context.session.delete(attr_prop_data)
# resource data
def resource_data_get_all(context, resource_id, data=None):
"""Looks up resource_data by resource.id.
@ -407,34 +523,6 @@ def resource_data_get(context, resource_id, key):
return result.value
def stack_tags_set(context, stack_id, tags):
with context.session.begin():
stack_tags_delete(context, stack_id)
result = []
for tag in tags:
stack_tag = models.StackTag()
stack_tag.tag = tag
stack_tag.stack_id = stack_id
stack_tag.save(session=context.session)
result.append(stack_tag)
return result or None
def stack_tags_delete(context, stack_id):
with transaction(context):
result = stack_tags_get(context, stack_id)
if result:
for tag in result:
context.session.delete(tag)
def stack_tags_get(context, stack_id):
result = (context.session.query(models.StackTag)
.filter_by(stack_id=stack_id)
.all())
return result or None
def resource_data_get_by_key(context, resource_id, key):
"""Looks up resource_data by resource_id and key.
@ -468,111 +556,13 @@ def resource_data_set(context, resource_id, key, value, redact=False):
return current
def resource_exchange_stacks(context, resource_id1, resource_id2):
with context.session.begin():
res1 = context.session.get(models.Resource, resource_id1)
res2 = context.session.get(models.Resource, resource_id2)
res1.stack, res2.stack = res2.stack, res1.stack
def resource_data_delete(context, resource_id, key):
result = resource_data_get_by_key(context, resource_id, key)
with context.session.begin():
context.session.delete(result)
def resource_create(context, values):
resource_ref = models.Resource()
resource_ref.update(values)
resource_ref.save(context.session)
return resource_ref
@retry_on_db_error
def resource_create_replacement(context,
existing_res_id,
new_res_values,
atomic_key, expected_engine_id=None):
try:
with context.session.begin():
new_res = resource_create(context, new_res_values)
update_data = {'replaced_by': new_res.id}
if not _try_resource_update(context,
existing_res_id, update_data,
atomic_key,
expected_engine_id=expected_engine_id):
data = {}
if 'name' in new_res_values:
data['resource_name'] = new_res_values['name']
raise exception.UpdateInProgress(**data)
except db_exception.DBReferenceError as exc:
# New template_id no longer exists
LOG.debug('Not creating replacement resource: %s', exc)
return None
else:
return new_res
def resource_get_all_by_stack(context, stack_id, filters=None):
query = context.session.query(
models.Resource
).filter_by(
stack_id=stack_id
).options(
orm.joinedload(models.Resource.data),
orm.joinedload(models.Resource.rsrc_prop_data),
)
query = db_filters.exact_filter(query, models.Resource, filters)
results = query.all()
return dict((res.name, res) for res in results)
def resource_get_all_active_by_stack(context, stack_id):
filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
subquery = context.session.query(models.Resource.id).filter_by(**filters)
results = context.session.query(models.Resource).filter_by(
stack_id=stack_id).filter(
models.Resource.id.notin_(subquery.scalar_subquery())
).options(orm.joinedload(models.Resource.data)).all()
return dict((res.id, res) for res in results)
def resource_get_all_by_root_stack(context, stack_id, filters=None,
stack_id_only=False):
query = context.session.query(
models.Resource
).filter_by(
root_stack_id=stack_id
)
if stack_id_only:
query = query.options(
orm.load_only(models.Resource.id, models.Resource.stack_id)
)
else:
query = query.options(
orm.joinedload(models.Resource.data),
orm.joinedload(models.Resource.rsrc_prop_data),
)
query = db_filters.exact_filter(query, models.Resource, filters)
results = query.all()
return dict((res.id, res) for res in results)
def engine_get_all_locked_by_stack(context, stack_id):
query = context.session.query(
func.distinct(models.Resource.engine_id)
).filter(
models.Resource.stack_id == stack_id,
models.Resource.engine_id.isnot(None))
return set(i[0] for i in query.all())
# resource properties data
def resource_prop_data_create_or_update(context, values, rpd_id=None):
@ -601,6 +591,9 @@ def resource_prop_data_get(context, resource_prop_data_id):
return result
# stack
def stack_get_by_name_and_owner_id(context, stack_name, owner_id):
query = _soft_delete_aware_query(
context, models.Stack
@ -880,6 +873,77 @@ def stack_delete(context, stack_id):
_soft_delete(context, s)
def reset_stack_status(context, stack_id, stack=None):
if stack is None:
stack = context.session.get(models.Stack, stack_id)
if stack is None:
raise exception.NotFound(_('Stack with id %s not found') % stack_id)
with context.session.begin():
query = context.session.query(models.Resource).filter_by(
status='IN_PROGRESS', stack_id=stack_id)
query.update({'status': 'FAILED',
'status_reason': 'Stack status manually reset',
'engine_id': None})
query = context.session.query(models.ResourceData)
query = query.join(models.Resource)
query = query.filter_by(stack_id=stack_id)
query = query.filter(
models.ResourceData.key.in_(heat_environment.HOOK_TYPES))
data_ids = [data.id for data in query]
if data_ids:
query = context.session.query(models.ResourceData)
query = query.filter(models.ResourceData.id.in_(data_ids))
query.delete(synchronize_session='fetch')
query = context.session.query(models.Stack).filter_by(owner_id=stack_id)
for child in query:
reset_stack_status(context, child.id, child)
with context.session.begin():
if stack.status == 'IN_PROGRESS':
stack.status = 'FAILED'
stack.status_reason = 'Stack status manually reset'
context.session.query(
models.StackLock
).filter_by(stack_id=stack_id).delete()
def stack_tags_set(context, stack_id, tags):
with context.session.begin():
stack_tags_delete(context, stack_id)
result = []
for tag in tags:
stack_tag = models.StackTag()
stack_tag.tag = tag
stack_tag.stack_id = stack_id
stack_tag.save(session=context.session)
result.append(stack_tag)
return result or None
def stack_tags_delete(context, stack_id):
with transaction(context):
result = stack_tags_get(context, stack_id)
if result:
for tag in result:
context.session.delete(tag)
def stack_tags_get(context, stack_id):
result = (context.session.query(models.StackTag)
.filter_by(stack_id=stack_id)
.all())
return result or None
# stack lock
def _is_duplicate_error(exc):
return isinstance(exc, db_exception.DBDuplicateEntry)
@ -955,6 +1019,9 @@ def stack_count_total_resources(context, stack_id):
).filter_by(root_stack_id=stack_id).scalar()
# user credentials
def user_creds_create(context):
values = context.to_dict()
user_creds_ref = models.UserCreds()
@ -1014,6 +1081,9 @@ def user_creds_delete(context, user_creds_id):
context.session.delete(creds)
# event
def event_get_all_by_tenant(context, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None):
query = context.session.query(models.Event)
@ -1093,9 +1163,12 @@ def _find_rpd_references(context, stack_id):
stack_id=stack_id,
).all()
)
rsrc_ref_ids = set(r.rsrc_prop_data_id for r
in context.session.query(models.Resource).filter_by(
stack_id=stack_id).all())
rsrc_ref_ids = set(
r.rsrc_prop_data_id for r
in context.session.query(models.Resource).filter_by(
stack_id=stack_id,
).all()
)
return ev_ref_ids | rsrc_ref_ids
@ -1205,6 +1278,9 @@ def event_create(context, values):
return event_ref
# software config
def software_config_create(context, values):
obj_ref = models.SoftwareConfig()
obj_ref.update(values)
@ -1245,6 +1321,9 @@ def software_config_delete(context, config_id):
context.session.delete(config)
# software deployment
def software_deployment_create(context, values):
obj_ref = models.SoftwareDeployment()
obj_ref.update(values)
@ -1303,6 +1382,9 @@ def software_deployment_delete(context, deployment_id):
context.session.delete(deployment)
# snapshot
def snapshot_create(context, values):
obj_ref = models.Snapshot()
obj_ref.update(values)
@ -1349,6 +1431,9 @@ def snapshot_get_all(context, stack_id):
stack_id=stack_id, tenant=context.tenant_id)
# service
def service_create(context, values):
service = models.Service()
service.update(values)
@ -1392,6 +1477,9 @@ def service_get_all_by_args(context, host, binary, hostname):
filter_by(hostname=hostname).all())
# purge
def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
def _validate_positive_integer(val, argname):
try:
@ -1654,6 +1742,9 @@ def _purge_stacks(stack_infos, engine, meta):
conn.execute(usr_creds_del)
# sync point
def sync_point_delete_all_by_stack_and_traversal(context, stack_id,
traversal_id):
with context.session.begin():
@ -1693,6 +1784,9 @@ def sync_point_update_input_data(context, entity_id,
return rows_updated
# data migration utils
def _crypt_action(encrypt):
if encrypt:
return _('encrypt')
@ -2004,43 +2098,3 @@ def _get_batch(session, context, query, model, batch_size=50):
for result in results:
yield result
last_batch_marker = results[-1].id
def reset_stack_status(context, stack_id, stack=None):
if stack is None:
stack = context.session.get(models.Stack, stack_id)
if stack is None:
raise exception.NotFound(_('Stack with id %s not found') % stack_id)
with context.session.begin():
query = context.session.query(models.Resource).filter_by(
status='IN_PROGRESS', stack_id=stack_id)
query.update({'status': 'FAILED',
'status_reason': 'Stack status manually reset',
'engine_id': None})
query = context.session.query(models.ResourceData)
query = query.join(models.Resource)
query = query.filter_by(stack_id=stack_id)
query = query.filter(
models.ResourceData.key.in_(heat_environment.HOOK_TYPES))
data_ids = [data.id for data in query]
if data_ids:
query = context.session.query(models.ResourceData)
query = query.filter(models.ResourceData.id.in_(data_ids))
query.delete(synchronize_session='fetch')
query = context.session.query(models.Stack).filter_by(owner_id=stack_id)
for child in query:
reset_stack_status(context, child.id, child)
with context.session.begin():
if stack.status == 'IN_PROGRESS':
stack.status = 'FAILED'
stack.status_reason = 'Stack status manually reset'
context.session.query(
models.StackLock
).filter_by(stack_id=stack_id).delete()