|
|
@ -1052,7 +1052,6 @@ def quota_usage_get_all_by_project(context, project_id): |
|
|
|
@require_admin_context |
|
|
|
def _quota_usage_create(context, project_id, resource, in_use, reserved, |
|
|
|
until_refresh, session=None): |
|
|
|
|
|
|
|
quota_usage_ref = models.QuotaUsage() |
|
|
|
quota_usage_ref.project_id = project_id |
|
|
|
quota_usage_ref.resource = resource |
|
|
@ -1124,36 +1123,93 @@ def quota_usage_update_resource(context, old_res, new_res): |
|
|
|
usage.until_refresh = 1 |
|
|
|
|
|
|
|
|
|
|
|
@require_context |
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) |
|
|
|
def _is_duplicate(exc): |
|
|
|
"""Check if an exception is caused by a unique constraint failure.""" |
|
|
|
return isinstance(exc, db_exc.DBDuplicateEntry) |
|
|
|
|
|
|
|
|
|
|
|
def _get_sync_updates(ctxt, project_id, session, resources, resource_name): |
|
|
|
"""Return usage for a specific resource. |
|
|
|
|
|
|
|
Resources are volumes, gigabytes, backups, snapshots, and also |
|
|
|
volumes_<type_name> snapshots_<type_name> for each volume type. |
|
|
|
""" |
|
|
|
# Grab the sync routine |
|
|
|
sync = QUOTA_SYNC_FUNCTIONS[resources[resource_name].sync] |
|
|
|
# VolumeTypeResource includes the id and name of the resource. |
|
|
|
volume_type_id = getattr(resources[resource_name], |
|
|
|
'volume_type_id', None) |
|
|
|
volume_type_name = getattr(resources[resource_name], |
|
|
|
'volume_type_name', None) |
|
|
|
updates = sync(ctxt, project_id, |
|
|
|
volume_type_id=volume_type_id, |
|
|
|
volume_type_name=volume_type_name, |
|
|
|
session=session) |
|
|
|
return updates |
|
|
|
|
|
|
|
|
|
|
|
@require_context |
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True, |
|
|
|
exception_checker=_is_duplicate) |
|
|
|
def quota_reserve(context, resources, quotas, deltas, expire, |
|
|
|
until_refresh, max_age, project_id=None): |
|
|
|
elevated = context.elevated() |
|
|
|
session = get_session() |
|
|
|
with session.begin(): |
|
|
|
|
|
|
|
# We don't use begin as a context manager because there are cases where we |
|
|
|
# want to finish a transaction and begin a new one. |
|
|
|
session.begin() |
|
|
|
try: |
|
|
|
if project_id is None: |
|
|
|
project_id = context.project_id |
|
|
|
|
|
|
|
# Get the current usages |
|
|
|
usages = _get_quota_usages(context, session, project_id, |
|
|
|
resources=deltas.keys()) |
|
|
|
# Loop until we can lock all the resource rows we'll be modifying |
|
|
|
while True: |
|
|
|
# Get the current usages and lock existing rows |
|
|
|
usages = _get_quota_usages(context, session, project_id, |
|
|
|
resources=deltas.keys()) |
|
|
|
missing = [res for res in deltas if res not in usages] |
|
|
|
# If we have successfully locked all the rows we can continue. |
|
|
|
# SELECT ... FOR UPDATE used in _get_quota usages cannot lock |
|
|
|
# non-existing rows, so there can be races with other requests |
|
|
|
# trying to create those rows. |
|
|
|
if not missing: |
|
|
|
break |
|
|
|
|
|
|
|
# Create missing rows calculating current values instead of |
|
|
|
# assuming there are no used resources as admins may have been |
|
|
|
# using this mechanism to force quota usage refresh. |
|
|
|
for resource in missing: |
|
|
|
updates = _get_sync_updates(elevated, project_id, session, |
|
|
|
resources, resource) |
|
|
|
_quota_usage_create(elevated, project_id, resource, |
|
|
|
updates[resource], 0, |
|
|
|
until_refresh or None, session=session) |
|
|
|
|
|
|
|
# NOTE: When doing the commit there can be a race condition with |
|
|
|
# other service instances or thread that are also creating the |
|
|
|
# same rows and in that case this will raise either a Deadlock |
|
|
|
# exception (when multiple transactions were creating the same rows |
|
|
|
# and the DB failed to acquire the row lock on the non-first |
|
|
|
# transaction) or a DBDuplicateEntry exception if some other |
|
|
|
# transaction created the row between us doing the |
|
|
|
# _get_quota_usages and here. In both cases this transaction will |
|
|
|
# be rolled back and the wrap_db_retry decorator will retry. |
|
|
|
|
|
|
|
# Commit new rows to the DB. |
|
|
|
session.commit() |
|
|
|
|
|
|
|
# Start a new session before trying to lock all the rows again. By |
|
|
|
# trying to get all the locks in a loop we can protect us against |
|
|
|
# admins directly deleting DB rows. |
|
|
|
session.begin() |
|
|
|
|
|
|
|
# Handle usage refresh |
|
|
|
work = set(deltas.keys()) |
|
|
|
while work: |
|
|
|
resource = work.pop() |
|
|
|
|
|
|
|
for resource in deltas.keys(): |
|
|
|
# Do we need to refresh the usage? |
|
|
|
refresh = False |
|
|
|
if resource not in usages: |
|
|
|
usages[resource] = _quota_usage_create(elevated, |
|
|
|
project_id, |
|
|
|
resource, |
|
|
|
0, 0, |
|
|
|
until_refresh or None, |
|
|
|
session=session) |
|
|
|
refresh = True |
|
|
|
elif usages[resource].in_use < 0: |
|
|
|
if usages[resource].in_use < 0: |
|
|
|
# If we created the entry right now we want to refresh. |
|
|
|
# Negative in_use count indicates a desync, so try to |
|
|
|
# heal from that... |
|
|
|
refresh = True |
|
|
@ -1168,43 +1224,12 @@ def quota_reserve(context, resources, quotas, deltas, expire, |
|
|
|
|
|
|
|
# OK, refresh the usage |
|
|
|
if refresh: |
|
|
|
# Grab the sync routine |
|
|
|
sync = QUOTA_SYNC_FUNCTIONS[resources[resource].sync] |
|
|
|
volume_type_id = getattr(resources[resource], |
|
|
|
'volume_type_id', None) |
|
|
|
volume_type_name = getattr(resources[resource], |
|
|
|
'volume_type_name', None) |
|
|
|
updates = sync(elevated, project_id, |
|
|
|
volume_type_id=volume_type_id, |
|
|
|
volume_type_name=volume_type_name, |
|
|
|
session=session) |
|
|
|
for res, in_use in updates.items(): |
|
|
|
# Make sure we have a destination for the usage! |
|
|
|
if res not in usages: |
|
|
|
usages[res] = _quota_usage_create( |
|
|
|
elevated, |
|
|
|
project_id, |
|
|
|
res, |
|
|
|
0, 0, |
|
|
|
until_refresh or None, |
|
|
|
session=session |
|
|
|
) |
|
|
|
|
|
|
|
# Update the usage |
|
|
|
usages[res].in_use = in_use |
|
|
|
usages[res].until_refresh = until_refresh or None |
|
|
|
|
|
|
|
# Because more than one resource may be refreshed |
|
|
|
# by the call to the sync routine, and we don't |
|
|
|
# want to double-sync, we make sure all refreshed |
|
|
|
# resources are dropped from the work set. |
|
|
|
work.discard(res) |
|
|
|
|
|
|
|
# NOTE(Vek): We make the assumption that the sync |
|
|
|
# routine actually refreshes the |
|
|
|
# resources that it is the sync routine |
|
|
|
# for. We don't check, because this is |
|
|
|
# a best-effort mechanism. |
|
|
|
updates = _get_sync_updates(elevated, project_id, session, |
|
|
|
resources, resource) |
|
|
|
# Updates will always contain a single resource usage matching |
|
|
|
# the resource variable. |
|
|
|
usages[resource].in_use = updates[resource] |
|
|
|
usages[resource].until_refresh = until_refresh or None |
|
|
|
|
|
|
|
# There are 3 cases where we want to update "until_refresh" in the |
|
|
|
# DB: when we enabled it, when we disabled it, and when we changed |
|
|
@ -1263,14 +1288,18 @@ def quota_reserve(context, resources, quotas, deltas, expire, |
|
|
|
if delta > 0: |
|
|
|
usages[resource].reserved += delta |
|
|
|
|
|
|
|
if unders: |
|
|
|
LOG.warning("Change will make usage less than 0 for the following " |
|
|
|
"resources: %s", unders) |
|
|
|
if overs: |
|
|
|
usages = {k: dict(in_use=v.in_use, reserved=v.reserved) |
|
|
|
for k, v in usages.items()} |
|
|
|
raise exception.OverQuota(overs=sorted(overs), quotas=quotas, |
|
|
|
usages=usages) |
|
|
|
if unders: |
|
|
|
LOG.warning("Change will make usage less than 0 for the following " |
|
|
|
"resources: %s", unders) |
|
|
|
if overs: |
|
|
|
usages = {k: dict(in_use=v.in_use, reserved=v.reserved) |
|
|
|
for k, v in usages.items()} |
|
|
|
raise exception.OverQuota(overs=sorted(overs), quotas=quotas, |
|
|
|
usages=usages) |
|
|
|
session.commit() |
|
|
|
except Exception: |
|
|
|
session.rollback() |
|
|
|
raise |
|
|
|
|
|
|
|
return reservations |
|
|
|
|
|
|
@ -6635,8 +6664,9 @@ def purge_deleted_rows(context, age_in_days): |
|
|
|
deleted.is_(True), models.QualityOfServiceSpecs. |
|
|
|
deleted_at < deleted_age)).delete() |
|
|
|
result = session.execute( |
|
|
|
table.delete() |
|
|
|
.where(table.c.deleted_at < deleted_age)) |
|
|
|
table.delete(). |
|
|
|
where(and_(table.columns.deleted.is_(True), |
|
|
|
table.c.deleted_at < deleted_age))) |
|
|
|
except db_exc.DBReferenceError as ex: |
|
|
|
LOG.error('DBError detected when purging from ' |
|
|
|
'%(tablename)s: %(error)s.', |
|
|
|