Shared filesystem management project for OpenStack.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

5752 lines
195 KiB

# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright (c) 2014 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import copy
import datetime
from functools import wraps
import ipaddress
import sys
import warnings
# NOTE(uglide): Required to override default oslo_db Query class
import manila.db.sqlalchemy.query # noqa
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import exception as db_exception
from oslo_db import options as db_options
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import utils as db_utils
from oslo_log import log
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import subqueryload
from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.expression import true
from sqlalchemy.sql import func
from manila.common import constants
from manila.db.sqlalchemy import models
from manila.db.sqlalchemy import utils
from manila import exception
from manila.i18n import _
from manila import quota
CONF = cfg.CONF
LOG = log.getLogger(__name__)
QUOTAS = quota.QUOTAS
_DEFAULT_QUOTA_NAME = 'default'
PER_PROJECT_QUOTAS = []
_FACADE = None
_DEFAULT_SQL_CONNECTION = 'sqlite://'
db_options.set_defaults(cfg.CONF,
connection=_DEFAULT_SQL_CONNECTION)
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = session.EngineFacade.from_config(cfg.CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def is_admin_context(context):
"""Indicates if the request context is an administrator."""
if not context:
warnings.warn(_('Use of empty request context is deprecated'),
DeprecationWarning)
raise Exception('die')
return context.is_admin
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True
def authorize_project_context(context, project_id):
"""Ensures a request has permission to access the given project."""
if is_user_context(context):
if not context.project_id:
raise exception.NotAuthorized()
elif context.project_id != project_id:
raise exception.NotAuthorized()
def authorize_user_context(context, user_id):
"""Ensures a request has permission to access the given user."""
if is_user_context(context):
if not context.user_id:
raise exception.NotAuthorized()
elif context.user_id != user_id:
raise exception.NotAuthorized()
def authorize_quota_class_context(context, class_name):
"""Ensures a request has permission to access the given quota class."""
if is_user_context(context):
if not context.quota_class:
raise exception.NotAuthorized()
elif context.quota_class != class_name:
raise exception.NotAuthorized()
def require_admin_context(f):
"""Decorator to require admin request context.
The first argument to the wrapped function must be the context.
"""
@wraps(f)
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]):
raise exception.AdminRequired()
return f(*args, **kwargs)
return wrapper
def require_context(f):
"""Decorator to require *any* user or admin context.
This does no authorization for user or project access matching, see
:py:func:`authorize_project_context` and
:py:func:`authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
@wraps(f)
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]) and not is_user_context(args[0]):
raise exception.NotAuthorized()
return f(*args, **kwargs)
return wrapper
def require_share_exists(f):
"""Decorator to require the specified share to exist.
Requires the wrapped function to use context and share_id as
their first two arguments.
"""
@wraps(f)
def wrapper(context, share_id, *args, **kwargs):
share_get(context, share_id)
return f(context, share_id, *args, **kwargs)
wrapper.__name__ = f.__name__
return wrapper
def require_share_instance_exists(f):
"""Decorator to require the specified share instance to exist.
Requires the wrapped function to use context and share_instance_id as
their first two arguments.
"""
@wraps(f)
def wrapper(context, share_instance_id, *args, **kwargs):
share_instance_get(context, share_instance_id)
return f(context, share_instance_id, *args, **kwargs)
wrapper.__name__ = f.__name__
return wrapper
def apply_sorting(model, query, sort_key, sort_dir):
if sort_dir.lower() not in ('desc', 'asc'):
msg = _("Wrong sorting data provided: sort key is '%(sort_key)s' "
"and sort direction is '%(sort_dir)s'.") % {
"sort_key": sort_key, "sort_dir": sort_dir}
raise exception.InvalidInput(reason=msg)
# NOTE(maaoyu): We add the additional sort by ID in this case to
# get deterministic results. Without the ordering by ID this could
# lead to flapping return lists.
sort_keys = [sort_key]
if sort_key != 'id':
sort_keys.append('id')
for sort_key in sort_keys:
sort_attr = getattr(model, sort_key)
sort_method = getattr(sort_attr, sort_dir.lower())
query = query.order_by(sort_method())
return query
def handle_db_data_error(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except db_exc.DBDataError:
msg = _('Error writing field to database.')
LOG.exception(msg)
raise exception.Invalid(msg)
return wrapper
def model_query(context, model, *args, **kwargs):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: model to query. Must be a subclass of ModelBase.
:param session: if present, the session to use
:param read_deleted: if present, overrides context's read_deleted field.
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
"""
session = kwargs.get('session') or get_session()
read_deleted = kwargs.get('read_deleted') or context.read_deleted
project_only = kwargs.get('project_only')
kwargs = dict()
if project_only and not context.is_admin:
kwargs['project_id'] = context.project_id
if read_deleted in ('no', 'n', False):
kwargs['deleted'] = False
elif read_deleted in ('yes', 'y', True):
kwargs['deleted'] = True
return db_utils.model_query(
model=model, session=session, args=args, **kwargs)
def exact_filter(query, model, filters, legal_keys,
created_at_key='created_at'):
"""Applies exact match filtering to a query.
Returns the updated query. Modifies filters argument to remove
filters consumed.
:param query: query to apply filters to
:param model: model object the query applies to, for IN-style
filtering
:param filters: dictionary of filters; values that are lists,
tuples, sets, or frozensets cause an 'IN' test to
be performed, while exact matching ('==' operator)
is used for other values
:param legal_keys: list of keys to apply exact filtering to
"""
filter_dict = {}
created_at_attr = getattr(model, created_at_key, None)
# Walk through all the keys
for key in legal_keys:
# Skip ones we're not filtering on
if key not in filters:
continue
# OK, filtering on this key; what value do we search for?
value = filters.pop(key)
if key == 'created_since' and created_at_attr:
# This is a reserved query parameter to indicate resources created
# after a particular datetime
value = timeutils.normalize_time(value)
query = query.filter(created_at_attr.op('>=')(value))
elif key == 'created_before' and created_at_attr:
# This is a reserved query parameter to indicate resources created
# before a particular datetime
value = timeutils.normalize_time(value)
query = query.filter(created_at_attr.op('<=')(value))
elif isinstance(value, (list, tuple, set, frozenset)):
# Looking for values in a list; apply to query directly
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))
else:
# OK, simple exact match; save for later
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter_by(**filter_dict)
return query
def ensure_model_dict_has_id(model_dict):
if not model_dict.get('id'):
model_dict['id'] = uuidutils.generate_uuid()
return model_dict
def _sync_shares(context, project_id, user_id, session, share_type_id=None):
(shares, gigs) = share_data_get_for_project(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {'shares': shares}
def _sync_snapshots(context, project_id, user_id, session, share_type_id=None):
(snapshots, gigs) = snapshot_data_get_for_project(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {'snapshots': snapshots}
def _sync_gigabytes(context, project_id, user_id, session, share_type_id=None):
_junk, share_gigs = share_data_get_for_project(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {"gigabytes": share_gigs}
def _sync_snapshot_gigabytes(context, project_id, user_id, session,
share_type_id=None):
_junk, snapshot_gigs = snapshot_data_get_for_project(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {"snapshot_gigabytes": snapshot_gigs}
def _sync_share_networks(context, project_id, user_id, session,
share_type_id=None):
share_networks_count = count_share_networks(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {'share_networks': share_networks_count}
def _sync_share_groups(context, project_id, user_id, session,
share_type_id=None):
share_groups_count = count_share_groups(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {'share_groups': share_groups_count}
def _sync_share_group_snapshots(context, project_id, user_id, session,
share_type_id=None):
share_group_snapshots_count = count_share_group_snapshots(
context, project_id, user_id, share_type_id=share_type_id,
session=session)
return {'share_group_snapshots': share_group_snapshots_count}
def _sync_share_replicas(context, project_id, user_id, session,
share_type_id=None):
share_replicas_count, _junk = share_replica_data_get_for_project(
context, project_id, user_id, session, share_type_id=share_type_id)
return {'share_replicas': share_replicas_count}
def _sync_replica_gigabytes(context, project_id, user_id, session,
share_type_id=None):
_junk, replica_gigs = share_replica_data_get_for_project(
context, project_id, user_id, session, share_type_id=share_type_id)
return {'replica_gigabytes': replica_gigs}
QUOTA_SYNC_FUNCTIONS = {
'_sync_shares': _sync_shares,
'_sync_snapshots': _sync_snapshots,
'_sync_gigabytes': _sync_gigabytes,
'_sync_snapshot_gigabytes': _sync_snapshot_gigabytes,
'_sync_share_networks': _sync_share_networks,
'_sync_share_groups': _sync_share_groups,
'_sync_share_group_snapshots': _sync_share_group_snapshots,
'_sync_share_replicas': _sync_share_replicas,
'_sync_replica_gigabytes': _sync_replica_gigabytes,
}
###################
@require_admin_context
def share_resources_host_update(context, current_host, new_host):
"""Updates the 'host' attribute of resources"""
resources = {
'instances': models.ShareInstance,
'servers': models.ShareServer,
'groups': models.ShareGroup,
}
result = {}
session = get_session()
with session.begin():
for res_name, res_model in resources.items():
host_field = res_model.host
query = model_query(
context, res_model, session=session, read_deleted="no",
).filter(host_field.like('{}%'.format(current_host)))
count = query.update(
{host_field: func.replace(host_field, current_host, new_host)},
synchronize_session=False)
result.update({res_name: count})
return result
###################
@require_admin_context
def service_destroy(context, service_id):
session = get_session()
with session.begin():
service_ref = service_get(context, service_id, session=session)
service_ref.soft_delete(session)
@require_admin_context
def service_get(context, service_id, session=None):
result = (model_query(
context,
models.Service,
session=session).
filter_by(id=service_id).
first())
if not result:
raise exception.ServiceNotFound(service_id=service_id)
return result
@require_admin_context
def service_get_all(context, disabled=None):
query = model_query(context, models.Service)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@require_admin_context
def service_get_all_by_topic(context, topic):
return (model_query(
context, models.Service, read_deleted="no").
filter_by(disabled=False).
filter_by(topic=topic).
all())
@require_admin_context
def service_get_by_host_and_topic(context, host, topic):
result = (model_query(
context, models.Service, read_deleted="no").
filter_by(disabled=False).
filter_by(host=host).
filter_by(topic=topic).
first())
if not result:
raise exception.ServiceNotFound(service_id=host)
return result
@require_admin_context
def _service_get_all_topic_subquery(context, session, topic, subq, label):
sort_value = getattr(subq.c, label)
return (model_query(context, models.Service,
func.coalesce(sort_value, 0),
session=session, read_deleted="no").
filter_by(topic=topic).
filter_by(disabled=False).
outerjoin((subq, models.Service.host == subq.c.host)).
order_by(sort_value).
all())
@require_admin_context
def service_get_all_share_sorted(context):
session = get_session()
with session.begin():
topic = CONF.share_topic
label = 'share_gigabytes'
subq = (model_query(context, models.Share,
func.sum(models.Share.size).label(label),
session=session, read_deleted="no").
join(models.ShareInstance,
models.ShareInstance.share_id == models.Share.id).
group_by(models.ShareInstance.host).
subquery())
return _service_get_all_topic_subquery(context,
session,
topic,
subq,
label)
@require_admin_context
def service_get_by_args(context, host, binary):
result = (model_query(context, models.Service).
filter_by(host=host).
filter_by(binary=binary).
first())
if not result:
raise exception.HostBinaryNotFound(host=host, binary=binary)
return result
@require_admin_context
def service_create(context, values):
session = get_session()
_ensure_availability_zone_exists(context, values, session)
service_ref = models.Service()
service_ref.update(values)
if not CONF.enable_new_services:
service_ref.disabled = True
with session.begin():
service_ref.save(session)
return service_ref
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def service_update(context, service_id, values):
session = get_session()
_ensure_availability_zone_exists(context, values, session, strict=False)
with session.begin():
service_ref = service_get(context, service_id, session=session)
service_ref.update(values)
service_ref.save(session=session)
###################
@require_context
def quota_get_all_by_project_and_user(context, project_id, user_id):
authorize_project_context(context, project_id)
user_quotas = model_query(
context, models.ProjectUserQuota,
models.ProjectUserQuota.resource,
models.ProjectUserQuota.hard_limit,
).filter_by(
project_id=project_id,
).filter_by(
user_id=user_id,
).all()
result = {'project_id': project_id, 'user_id': user_id}
for u_quota in user_quotas:
result[u_quota.resource] = u_quota.hard_limit
return result
@require_context
def quota_get_all_by_project_and_share_type(context, project_id,
share_type_id):
authorize_project_context(context, project_id)
share_type_quotas = model_query(
context, models.ProjectShareTypeQuota,
models.ProjectShareTypeQuota.resource,
models.ProjectShareTypeQuota.hard_limit,
).filter_by(
project_id=project_id,
).filter_by(
share_type_id=share_type_id,
).all()
result = {
'project_id': project_id,
'share_type_id': share_type_id,
}
for st_quota in share_type_quotas:
result[st_quota.resource] = st_quota.hard_limit
return result
@require_context
def quota_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
project_quotas = model_query(
context, models.Quota, read_deleted="no",
).filter_by(
project_id=project_id,
).all()
result = {'project_id': project_id}
for p_quota in project_quotas:
result[p_quota.resource] = p_quota.hard_limit
return result
@require_context
def quota_get_all(context, project_id):
authorize_project_context(context, project_id)
result = (model_query(context, models.ProjectUserQuota).
filter_by(project_id=project_id).
all())
return result
@require_admin_context
def quota_create(context, project_id, resource, limit, user_id=None,
share_type_id=None):
per_user = user_id and resource not in PER_PROJECT_QUOTAS
if per_user:
check = model_query(context, models.ProjectUserQuota).filter(
models.ProjectUserQuota.project_id == project_id,
models.ProjectUserQuota.user_id == user_id,
models.ProjectUserQuota.resource == resource,
).all()
quota_ref = models.ProjectUserQuota()
quota_ref.user_id = user_id
elif share_type_id:
check = model_query(context, models.ProjectShareTypeQuota).filter(
models.ProjectShareTypeQuota.project_id == project_id,
models.ProjectShareTypeQuota.share_type_id == share_type_id,
models.ProjectShareTypeQuota.resource == resource,
).all()
quota_ref = models.ProjectShareTypeQuota()
quota_ref.share_type_id = share_type_id
else:
check = model_query(context, models.Quota).filter(
models.Quota.project_id == project_id,
models.Quota.resource == resource,
).all()
quota_ref = models.Quota()
if check:
raise exception.QuotaExists(project_id=project_id, resource=resource)
quota_ref.project_id = project_id
quota_ref.resource = resource
quota_ref.hard_limit = limit
session = get_session()
with session.begin():
quota_ref.save(session)
return quota_ref
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_update(context, project_id, resource, limit, user_id=None,
share_type_id=None):
per_user = user_id and resource not in PER_PROJECT_QUOTAS
if per_user:
query = model_query(context, models.ProjectUserQuota).filter(
models.ProjectUserQuota.project_id == project_id,
models.ProjectUserQuota.user_id == user_id,
models.ProjectUserQuota.resource == resource,
)
elif share_type_id:
query = model_query(context, models.ProjectShareTypeQuota).filter(
models.ProjectShareTypeQuota.project_id == project_id,
models.ProjectShareTypeQuota.share_type_id == share_type_id,
models.ProjectShareTypeQuota.resource == resource,
)
else:
query = model_query(context, models.Quota).filter(
models.Quota.project_id == project_id,
models.Quota.resource == resource,
)
result = query.update({'hard_limit': limit})
if not result:
if per_user:
raise exception.ProjectUserQuotaNotFound(
project_id=project_id, user_id=user_id)
elif share_type_id:
raise exception.ProjectShareTypeQuotaNotFound(
project_id=project_id, share_type=share_type_id)
raise exception.ProjectQuotaNotFound(project_id=project_id)
###################
@require_context
def quota_class_get(context, class_name, resource, session=None):
result = (model_query(context, models.QuotaClass, session=session,
read_deleted="no").
filter_by(class_name=class_name).
filter_by(resource=resource).
first())
if not result:
raise exception.QuotaClassNotFound(class_name=class_name)
return result
@require_context
def quota_class_get_default(context):
rows = (model_query(context, models.QuotaClass, read_deleted="no").
filter_by(class_name=_DEFAULT_QUOTA_NAME).
all())
result = {'class_name': _DEFAULT_QUOTA_NAME}
for row in rows:
result[row.resource] = row.hard_limit
return result
@require_context
def quota_class_get_all_by_name(context, class_name):
authorize_quota_class_context(context, class_name)
rows = (model_query(context, models.QuotaClass, read_deleted="no").
filter_by(class_name=class_name).
all())
result = {'class_name': class_name}
for row in rows:
result[row.resource] = row.hard_limit
return result
@require_admin_context
def quota_class_create(context, class_name, resource, limit):
quota_class_ref = models.QuotaClass()
quota_class_ref.class_name = class_name
quota_class_ref.resource = resource
quota_class_ref.hard_limit = limit
session = get_session()
with session.begin():
quota_class_ref.save(session)
return quota_class_ref
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_class_update(context, class_name, resource, limit):
result = (model_query(context, models.QuotaClass, read_deleted="no").
filter_by(class_name=class_name).
filter_by(resource=resource).
update({'hard_limit': limit}))
if not result:
raise exception.QuotaClassNotFound(class_name=class_name)
###################
@require_context
def quota_usage_get(context, project_id, resource, user_id=None,
share_type_id=None):
query = (model_query(context, models.QuotaUsage, read_deleted="no").
filter_by(project_id=project_id).
filter_by(resource=resource))
if user_id:
if resource not in PER_PROJECT_QUOTAS:
result = query.filter_by(user_id=user_id).first()
else:
result = query.filter_by(user_id=None).first()
elif share_type_id:
result = query.filter_by(queryshare_type_id=share_type_id).first()
else:
result = query.first()
if not result:
raise exception.QuotaUsageNotFound(project_id=project_id)
return result
def _quota_usage_get_all(context, project_id, user_id=None,
share_type_id=None):
authorize_project_context(context, project_id)
query = (model_query(context, models.QuotaUsage, read_deleted="no").
filter_by(project_id=project_id))
result = {'project_id': project_id}
if user_id:
query = query.filter(or_(models.QuotaUsage.user_id == user_id,
models.QuotaUsage.user_id is None))
result['user_id'] = user_id
elif share_type_id:
query = query.filter_by(share_type_id=share_type_id)
result['share_type_id'] = share_type_id
else:
query = query.filter_by(share_type_id=None)
rows = query.all()
for row in rows:
if row.resource in result:
result[row.resource]['in_use'] += row.in_use
result[row.resource]['reserved'] += row.reserved
else:
result[row.resource] = dict(in_use=row.in_use,
reserved=row.reserved)
return result
@require_context
def quota_usage_get_all_by_project(context, project_id):
return _quota_usage_get_all(context, project_id)
@require_context
def quota_usage_get_all_by_project_and_user(context, project_id, user_id):
return _quota_usage_get_all(context, project_id, user_id=user_id)
@require_context
def quota_usage_get_all_by_project_and_share_type(context, project_id,
share_type_id):
return _quota_usage_get_all(
context, project_id, share_type_id=share_type_id)
def _quota_usage_create(context, project_id, user_id, resource, in_use,
reserved, until_refresh, share_type_id=None,
session=None):
quota_usage_ref = models.QuotaUsage()
if share_type_id:
quota_usage_ref.share_type_id = share_type_id
else:
quota_usage_ref.user_id = user_id
quota_usage_ref.project_id = project_id
quota_usage_ref.resource = resource
quota_usage_ref.in_use = in_use
quota_usage_ref.reserved = reserved
quota_usage_ref.until_refresh = until_refresh
# updated_at is needed for judgement of max_age
quota_usage_ref.updated_at = timeutils.utcnow()
quota_usage_ref.save(session=session)
return quota_usage_ref
@require_admin_context
def quota_usage_create(context, project_id, user_id, resource, in_use,
reserved, until_refresh, share_type_id=None):
session = get_session()
return _quota_usage_create(
context, project_id, user_id, resource, in_use, reserved,
until_refresh, share_type_id=share_type_id, session=session)
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_usage_update(context, project_id, user_id, resource,
share_type_id=None, **kwargs):
updates = {}
for key in ('in_use', 'reserved', 'until_refresh'):
if key in kwargs:
updates[key] = kwargs[key]
query = model_query(
context, models.QuotaUsage, read_deleted="no",
).filter_by(project_id=project_id).filter_by(resource=resource)
if share_type_id:
query = query.filter_by(share_type_id=share_type_id)
else:
query = query.filter(or_(models.QuotaUsage.user_id == user_id,
models.QuotaUsage.user_id is None))
result = query.update(updates)
if not result:
raise exception.QuotaUsageNotFound(project_id=project_id)
###################
def _reservation_create(context, uuid, usage, project_id, user_id, resource,
delta, expire, share_type_id=None, session=None):
reservation_ref = models.Reservation()
reservation_ref.uuid = uuid
reservation_ref.usage_id = usage['id']
reservation_ref.project_id = project_id
if share_type_id:
reservation_ref.share_type_id = share_type_id
else:
reservation_ref.user_id = user_id
reservation_ref.resource = resource
reservation_ref.delta = delta
reservation_ref.expire = expire
reservation_ref.save(session=session)
return reservation_ref
###################
# NOTE(johannes): The quota code uses SQL locking to ensure races don't
# cause under or over counting of resources. To avoid deadlocks, this
# code always acquires the lock on quota_usages before acquiring the lock
# on reservations.
def _get_share_type_quota_usages(context, session, project_id, share_type_id):
rows = model_query(
context, models.QuotaUsage, read_deleted="no", session=session,
).filter(
models.QuotaUsage.project_id == project_id,
models.QuotaUsage.share_type_id == share_type_id,
).with_lockmode('update').all()
return {row.resource: row for row in rows}
def _get_user_quota_usages(context, session, project_id, user_id):
# Broken out for testability
rows = (model_query(context, models.QuotaUsage,
read_deleted="no",
session=session).
filter_by(project_id=project_id).
filter(or_(models.QuotaUsage.user_id == user_id,
models.QuotaUsage.user_id is None)).
with_lockmode('update').
all())
return {row.resource: row for row in rows}
def _get_project_quota_usages(context, session, project_id):
rows = (model_query(context, models.QuotaUsage,
read_deleted="no",
session=session).
filter_by(project_id=project_id).
filter(models.QuotaUsage.share_type_id is None).
with_lockmode('update').
all())
result = dict()
# Get the total count of in_use,reserved
for row in rows:
if row.resource in result:
result[row.resource]['in_use'] += row.in_use
result[row.resource]['reserved'] += row.reserved
result[row.resource]['total'] += (row.in_use + row.reserved)
else:
result[row.resource] = dict(in_use=row.in_use,
reserved=row.reserved,
total=row.in_use + row.reserved)
return result
@require_context
def quota_reserve(context, resources, project_quotas, user_quotas,
share_type_quotas, deltas, expire, until_refresh,
max_age, project_id=None, user_id=None, share_type_id=None,
overquota_allowed=False):
user_reservations = _quota_reserve(
context, resources, project_quotas, user_quotas,
deltas, expire, until_refresh, max_age, project_id, user_id=user_id,
overquota_allowed=overquota_allowed)
if share_type_id:
try:
st_reservations = _quota_reserve(
context, resources, project_quotas, share_type_quotas,
deltas, expire, until_refresh, max_age, project_id,
share_type_id=share_type_id,
overquota_allowed=overquota_allowed)
except exception.OverQuota:
with excutils.save_and_reraise_exception():
# rollback previous reservations
reservation_rollback(
context, user_reservations,
project_id=project_id, user_id=user_id)
return user_reservations + st_reservations
return user_reservations
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def _quota_reserve(context, resources, project_quotas, user_or_st_quotas,
deltas, expire, until_refresh,
max_age, project_id=None, user_id=None, share_type_id=None,
overquota_allowed=False):
elevated = context.elevated()
session = get_session()
with session.begin():
if project_id is None:
project_id = context.project_id
if share_type_id:
user_or_st_usages = _get_share_type_quota_usages(
context, session, project_id, share_type_id)
else:
user_id = user_id if user_id else context.user_id
user_or_st_usages = _get_user_quota_usages(
context, session, project_id, user_id)
# Get the current usages
project_usages = _get_project_quota_usages(
context, session, project_id)
# Handle usage refresh
work = set(deltas.keys())
while work:
resource = work.pop()
# Do we need to refresh the usage?
refresh = False
if ((resource not in PER_PROJECT_QUOTAS) and
(resource not in user_or_st_usages)):
user_or_st_usages[resource] = _quota_usage_create(
elevated,
project_id,
user_id,
resource,
0, 0,
until_refresh or None,
share_type_id=share_type_id,
session=session)
refresh = True
elif ((resource in PER_PROJECT_QUOTAS) and
(resource not in user_or_st_usages)):
user_or_st_usages[resource] = _quota_usage_create(
elevated,
project_id,
None,
resource,
0, 0,
until_refresh or None,
share_type_id=share_type_id,
session=session)
refresh = True
elif user_or_st_usages[resource].in_use < 0:
# Negative in_use count indicates a desync, so try to
# heal from that...
refresh = True
elif user_or_st_usages[resource].until_refresh is not None:
user_or_st_usages[resource].until_refresh -= 1
if user_or_st_usages[resource].until_refresh <= 0:
refresh = True
elif max_age and (user_or_st_usages[resource].updated_at -
timeutils.utcnow()).seconds >= max_age:
refresh = True
# OK, refresh the usage
if refresh:
# Grab the sync routine
sync = QUOTA_SYNC_FUNCTIONS[resources[resource].sync]
updates = sync(
elevated, project_id, user_id,
share_type_id=share_type_id, session=session)
for res, in_use in updates.items():
# Make sure we have a destination for the usage!
if ((res not in PER_PROJECT_QUOTAS) and
(res not in user_or_st_usages)):
user_or_st_usages[res] = _quota_usage_create(
elevated,
project_id,
user_id,
res,
0, 0,
until_refresh or None,
share_type_id=share_type_id,
session=session)
if ((res in PER_PROJECT_QUOTAS) and
(res not in user_or_st_usages)):
user_or_st_usages[res] = _quota_usage_create(
elevated,
project_id,
None,
res,
0, 0,
until_refresh or None,
share_type_id=share_type_id,
session=session)
if user_or_st_usages[res].in_use != in_use:
LOG.debug(
'quota_usages out of sync, updating. '
'project_id: %(project_id)s, '
'user_id: %(user_id)s, '
'share_type_id: %(share_type_id)s, '
'resource: %(res)s, '
'tracked usage: %(tracked_use)s, '
'actual usage: %(in_use)s',
{'project_id': project_id,
'user_id': user_id,
'share_type_id': share_type_id,
'res': res,
'tracked_use': user_or_st_usages[res].in_use,
'in_use': in_use})
# Update the usage
user_or_st_usages[res].in_use = in_use
user_or_st_usages[res].until_refresh = (
until_refresh or None)
# Because more than one resource may be refreshed
# by the call to the sync routine, and we don't
# want to double-sync, we make sure all refreshed
# resources are dropped from the work set.
work.discard(res)
# NOTE(Vek): We make the assumption that the sync
# routine actually refreshes the
# resources that it is the sync routine
# for. We don't check, because this is
# a best-effort mechanism.
# Check for deltas that would go negative
unders = [res for res, delta in deltas.items()
if delta < 0 and
delta + user_or_st_usages[res].in_use < 0]
# Now, let's check the quotas
# NOTE(Vek): We're only concerned about positive increments.
# If a project has gone over quota, we want them to
# be able to reduce their usage without any
# problems.
for key, value in user_or_st_usages.items():
if key not in project_usages:
project_usages[key] = value
overs = [res for res, delta in deltas.items()
if user_or_st_quotas[res] >= 0 and delta >= 0 and
(0 <= project_quotas[res] < delta +
project_usages[res]['total'] or
user_or_st_quotas[res] < delta +
user_or_st_usages[res].total)]
# NOTE(carloss): If OverQuota is allowed, there is no problem to exceed
# the quotas, so we reset the overs list and LOG it.
if overs and overquota_allowed:
msg = _("The service has identified one or more exceeded "
"quotas. Please check the quotas for project "
"%(project_id)s, user %(user_id)s and share type "
"%(share_type_id)s, and adjust them if "
"necessary.") % {
"project_id": project_id,
"user_id": user_id,
"share_type_id": share_type_id
}
LOG.warning(msg)
overs = []
# NOTE(Vek): The quota check needs to be in the transaction,
# but the transaction doesn't fail just because
# we're over quota, so the OverQuota raise is
# outside the transaction. If we did the raise
# here, our usage updates would be discarded, but
# they're not invalidated by being over-quota.
# Create the reservations
if not overs:
reservations = []
for res, delta in deltas.items():
reservation = _reservation_create(elevated,
uuidutils.generate_uuid(),
user_or_st_usages[res],
project_id,
user_id,
res, delta, expire,
share_type_id=share_type_id,
session=session)
reservations.append(reservation.uuid)
# Also update the reserved quantity
# NOTE(Vek): Again, we are only concerned here about
# positive increments. Here, though, we're
# worried about the following scenario:
#
# 1) User initiates resize down.
# 2) User allocates a new instance.
# 3) Resize down fails or is reverted.
# 4) User is now over quota.
#
# To prevent this, we only update the
# reserved value if the delta is positive.
if delta > 0:
user_or_st_usages[res].reserved += delta
# Apply updates to the usages table
for usage_ref in user_or_st_usages.values():
session.add(usage_ref)
if unders:
LOG.warning("Change will make usage less than 0 for the following "
"resources: %s", unders)
if overs:
if project_quotas == user_or_st_quotas:
usages = project_usages
else:
usages = user_or_st_usages
usages = {k: dict(in_use=v['in_use'], reserved=v['reserved'])
for k, v in usages.items()}
raise exception.OverQuota(
overs=sorted(overs), quotas=user_or_st_quotas, usages=usages)
return reservations
def _quota_reservations_query(session, context, reservations):
"""Return the relevant reservations."""
# Get the listed reservations
return (model_query(context, models.Reservation,
read_deleted="no",
session=session).
filter(models.Reservation.uuid.in_(reservations)).
with_lockmode('update'))
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_commit(context, reservations, project_id=None, user_id=None,
share_type_id=None):
session = get_session()
with session.begin():
if share_type_id:
st_usages = _get_share_type_quota_usages(
context, session, project_id, share_type_id)
else:
st_usages = {}
user_usages = _get_user_quota_usages(
context, session, project_id, user_id)
reservation_query = _quota_reservations_query(
session, context, reservations)
for reservation in reservation_query.all():
if reservation['share_type_id']:
usages = st_usages
else:
usages = user_usages
usage = usages[reservation.resource]
if reservation.delta >= 0:
usage.reserved -= reservation.delta
usage.in_use += reservation.delta
reservation_query.soft_delete(synchronize_session=False)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_rollback(context, reservations, project_id=None, user_id=None,
share_type_id=None):
session = get_session()
with session.begin():
if share_type_id:
st_usages = _get_share_type_quota_usages(
context, session, project_id, share_type_id)
else:
st_usages = {}
user_usages = _get_user_quota_usages(
context, session, project_id, user_id)
reservation_query = _quota_reservations_query(
session, context, reservations)
for reservation in reservation_query.all():
if reservation['share_type_id']:
usages = st_usages
else:
usages = user_usages
usage = usages[reservation.resource]
if reservation.delta >= 0:
usage.reserved -= reservation.delta
reservation_query.soft_delete(synchronize_session=False)
@require_admin_context
def quota_destroy_all_by_project_and_user(context, project_id, user_id):
session = get_session()
with session.begin():
(model_query(context, models.ProjectUserQuota, session=session,
read_deleted="no").
filter_by(project_id=project_id).
filter_by(user_id=user_id).soft_delete(synchronize_session=False))
(model_query(context, models.QuotaUsage,
session=session, read_deleted="no").
filter_by(project_id=project_id).
filter_by(user_id=user_id).soft_delete(synchronize_session=False))
(model_query(context, models.Reservation,
session=session, read_deleted="no").
filter_by(project_id=project_id).
filter_by(user_id=user_id).soft_delete(synchronize_session=False))
@require_admin_context
def quota_destroy_all_by_share_type(context, share_type_id, project_id=None):
"""Soft deletes all quotas, usages and reservations.
:param context: request context for queries, updates and logging
:param share_type_id: ID of the share type to filter the quotas, usages
and reservations under.
:param project_id: ID of the project to filter the quotas, usages and
reservations under. If not provided, share type quotas for all
projects will be acted upon.
"""
session = get_session()
with session.begin():
share_type_quotas = model_query(
context, models.ProjectShareTypeQuota, session=session,
read_deleted="no",
).filter_by(share_type_id=share_type_id)
share_type_quota_usages = model_query(
context, models.QuotaUsage, session=session, read_deleted="no",
).filter_by(share_type_id=share_type_id)
share_type_quota_reservations = model_query(
context, models.Reservation, session=session, read_deleted="no",
).filter_by(share_type_id=share_type_id)
if project_id is not None:
share_type_quotas = share_type_quotas.filter_by(
project_id=project_id)
share_type_quota_usages = share_type_quota_usages.filter_by(
project_id=project_id)
share_type_quota_reservations = (
share_type_quota_reservations.filter_by(project_id=project_id))
share_type_quotas.soft_delete(synchronize_session=False)
share_type_quota_usages.soft_delete(synchronize_session=False)
share_type_quota_reservations.soft_delete(synchronize_session=False)
@require_admin_context
def quota_destroy_all_by_project(context, project_id):
session = get_session()
with session.begin():
(model_query(context, models.Quota, session=session,
read_deleted="no").
filter_by(project_id=project_id).
soft_delete(synchronize_session=False))
(model_query(context, models.ProjectUserQuota, session=session,
read_deleted="no").
filter_by(project_id=project_id).
soft_delete(synchronize_session=False))
(model_query(context, models.QuotaUsage,
session=session, read_deleted="no").
filter_by(project_id=project_id).
soft_delete(synchronize_session=False))
(model_query(context, models.Reservation,
session=session, read_deleted="no").
filter_by(project_id=project_id).
soft_delete(synchronize_session=False))
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_expire(context):
session = get_session()
with session.begin():
current_time = timeutils.utcnow()
reservation_query = (model_query(
context, models.Reservation,
session=session, read_deleted="no").
filter(models.Reservation.expire < current_time))
for reservation in reservation_query.all():
if reservation.delta >= 0:
quota_usage = model_query(context, models.QuotaUsage,
session=session,
read_deleted="no").filter(
models.QuotaUsage.id == reservation.usage_id).first()
quota_usage.reserved -= reservation.delta
session.add(quota_usage)
reservation_query.soft_delete(synchronize_session=False)
################
def _extract_subdict_by_fields(source_dict, fields):
dict_to_extract_from = copy.deepcopy(source_dict)
sub_dict = {}
for field in fields:
field_value = dict_to_extract_from.pop(field, None)
if field_value:
sub_dict.update({field: field_value})
return sub_dict, dict_to_extract_from
def _extract_share_instance_values(values):
share_instance_model_fields = [
'status', 'host', 'scheduled_at', 'launched_at', 'terminated_at',
'share_server_id', 'share_network_id', 'availability_zone',
'replica_state', 'share_type_id', 'share_type', 'access_rules_status',
]
share_instance_values, share_values = (
_extract_subdict_by_fields(values, share_instance_model_fields)
)
return share_instance_values, share_values
def _change_size_to_instance_size(snap_instance_values):
if 'size' in snap_instance_values:
snap_instance_values['instance_size'] = snap_instance_values['size']
snap_instance_values.pop('size')
def _extract_snapshot_instance_values(values):
fields = ['status', 'progress', 'provider_location']
snapshot_instance_values, snapshot_values = (
_extract_subdict_by_fields(values, fields)
)
return snapshot_instance_values, snapshot_values
################
@require_context
def share_instance_create(context, share_id, values):
session = get_session()
with session.begin():
return _share_instance_create(context, share_id, values, session)
def _share_instance_create(context, share_id, values, session):
if not values.get('id'):
values['id'] = uuidutils.generate_uuid()
values.update({'share_id': share_id})
share_instance_ref = models.ShareInstance()
share_instance_ref.update(values)
share_instance_ref.save(session=session)
return share_instance_get(context, share_instance_ref['id'],
session=session)
@require_context
def share_instance_update(context, share_instance_id, values,
with_share_data=False):
session = get_session()
_ensure_availability_zone_exists(context, values, session, strict=False)
with session.begin():
instance_ref = _share_instance_update(
context, share_instance_id, values, session
)
if with_share_data:
parent_share = share_get(context, instance_ref['share_id'],
session=session)
instance_ref.set_share_data(parent_share)
return instance_ref
def share_and_snapshot_instances_status_update(
context, values, share_instance_ids=None, snapshot_instance_ids=None,
current_expected_status=None):
updated_share_instances = None
updated_snapshot_instances = None
session = get_session()
with session.begin():
if current_expected_status and share_instance_ids:
filters = {'instance_ids': share_instance_ids}
share_instances = share_instances_get_all(
context, filters=filters, session=session)
all_instances_are_compliant = all(
instance['status'] == current_expected_status
for instance in share_instances)
if not all_instances_are_compliant:
msg = _('At least one of the shares is not in the %(status)s '
'status.') % {
'status': current_expected_status
}
raise exception.InvalidShareInstance(reason=msg)
if current_expected_status and snapshot_instance_ids:
filters = {'instance_ids': snapshot_instance_ids}
snapshot_instances = share_snapshot_instance_get_all_with_filters(
context, filters, session=session)
all_snap_instances_are_compliant = all(
snap_instance['status'] == current_expected_status
for snap_instance in snapshot_instances)
if not all_snap_instances_are_compliant:
msg = _('At least one of the snapshots is not in the '
'%(status)s status.') % {
'status': current_expected_status
}
raise exception.InvalidShareSnapshotInstance(reason=msg)
if share_instance_ids:
updated_share_instances = share_instances_status_update(
context, share_instance_ids, values, session=session)
if snapshot_instance_ids:
updated_snapshot_instances = (
share_snapshot_instances_status_update(
context, snapshot_instance_ids, values, session=session))
return updated_share_instances, updated_snapshot_instances
@require_context
def share_instances_status_update(
context, share_instance_ids, values, session=None):
session = session or get_session()
result = (
model_query(
context, models.ShareInstance, read_deleted="no",
session=session).filter(
models.ShareInstance.id.in_(share_instance_ids)).update(
values, synchronize_session=False))
return result
def _share_instance_update(context, share_instance_id, values, session):
share_instance_ref = share_instance_get(context, share_instance_id,
session=session)
share_instance_ref.update(values)
share_instance_ref.save(session=session)
return share_instance_ref
@require_context
def share_instance_get(context, share_instance_id, session=None,
with_share_data=False):
if session is None:
session = get_session()
result = model_query(
context, models.ShareInstance, session=session,
).filter_by(
id=share_instance_id,
).options(
joinedload('export_locations'),
joinedload('share_type'),
).first()
if result is None:
raise exception.NotFound()
if with_share_data:
parent_share = share_get(context, result['share_id'], session=session)
result.set_share_data(parent_share)
return result
@require_admin_context
def share_instances_get_all(context, filters=None, session=None):
session = session or get_session()
query = model_query(
context, models.ShareInstance, session=session, read_deleted="no",
).options(
joinedload('export_locations'),
)
filters = filters or {}
export_location_id = filters.get('export_location_id')
export_location_path = filters.get('export_location_path')
if export_location_id or export_location_path:
query = query.join(
models.ShareInstanceExportLocations,
models.ShareInstanceExportLocations.share_instance_id ==
models.ShareInstance.id)
if export_location_path:
query = query.filter(
models.ShareInstanceExportLocations.path ==
export_location_path)
if export_location_id:
query = query.filter(
models.ShareInstanceExportLocations.uuid ==
export_location_id)
instance_ids = filters.get('instance_ids')
if instance_ids:
query = query.filter(models.ShareInstance.id.in_(instance_ids))
# TODO(gouthamr): This DB API method needs to be generalized for all
# share instance fields.
host = filters.get('host')
if host:
query = query.filter(
or_(models.ShareInstance.host == host,
models.ShareInstance.host.like("{0}#%".format(host)))
)
share_server_id = filters.get('share_server_id')
if share_server_id:
query = query.filter(
models.ShareInstance.share_server_id == share_server_id)
# Returns list of share instances that satisfy filters.
query = query.all()
return query
@require_context
def _update_share_instance_usages(context, share, instance_ref,
is_replica=False):
deltas = {}
no_instances_remain = len(share.instances) == 0
share_usages_to_release = {"shares": -1, "gigabytes": -share['size']}
replica_usages_to_release = {"share_replicas": -1,
"replica_gigabytes": -share['size']}
if is_replica and no_instances_remain:
# A share that had a replication_type is being deleted, so there's
# need to update the share replica quotas and the share quotas
deltas.update(replica_usages_to_release)
deltas.update(share_usages_to_release)
elif is_replica:
# The user is deleting a share replica
deltas.update(replica_usages_to_release)
else:
# A share with no replication_type is being deleted
deltas.update(share_usages_to_release)
reservations = None
try:
# we give the user_id of the share, to update
# the quota usage for the user, who created the share
reservations = QUOTAS.reserve(
context,
project_id=share['project_id'],
user_id=share['user_id'],
share_type_id=instance_ref['share_type_id'],
**deltas)
QUOTAS.commit(
context, reservations, project_id=share['project_id'],
user_id=share['user_id'],
share_type_id=instance_ref['share_type_id'])
except Exception:
resource_name = (
'share replica' if is_replica else 'share')
resource_id = instance_ref['id'] if is_replica else share['id']
msg = (_("Failed to update usages deleting %(resource_name)s "
"'%(id)s'.") % {'id': resource_id,
"resource_name": resource_name})
LOG.exception(msg)
if reservations:
QUOTAS.rollback(
context, reservations,
share_type_id=instance_ref['share_type_id'])
@require_context
def share_instance_delete(context, instance_id, session=None,
need_to_update_usages=False):
if session is None:
session = get_session()
with session.begin():
share_export_locations_update(context, instance_id, [], delete=True)
instance_ref = share_instance_get(context, instance_id,
session=session)
is_replica = instance_ref['replica_state'] is not None
instance_ref.soft_delete(session=session, update_status=True)
share = share_get(context, instance_ref['share_id'], session=session)
if len(share.instances) == 0:
share_access_delete_all_by_share(context, share['id'])
session.query(models.ShareMetadata).filter_by(
share_id=share['id']).soft_delete()
share.soft_delete(session=session)
if need_to_update_usages:
_update_share_instance_usages(context, share, instance_ref,
is_replica=is_replica)
def _set_instances_share_data(context, instances, session):
if instances and not isinstance(instances, list):
instances = [instances]
instances_with_share_data = []
for instance in instances:
try:
parent_share = share_get(context, instance['share_id'],
session=session)
except exception.NotFound:
continue
instance.set_share_data(parent_share)
instances_with_share_data.append(instance)
return instances_with_share_data
@require_admin_context
def share_instances_get_all_by_host(context, host, with_share_data=False,
status=None, session=None):
"""Retrieves all share instances hosted on a host."""
session = session or get_session()
instances = (
model_query(context, models.ShareInstance).filter(
or_(
models.ShareInstance.host == host,
models.ShareInstance.host.like("{0}#%".format(host))
)
)
)
if status is not None:
instances = instances.filter(models.ShareInstance.status == status)
# Returns list of all instances that satisfy filters.
instances = instances.all()
if with_share_data:
instances = _set_instances_share_data(context, instances, session)
return instances
@require_context
def share_instances_get_all_by_share_network(context, share_network_id):
"""Returns list of share instances that belong to given share network."""
result = (
model_query(context, models.ShareInstance).filter(
models.ShareInstance.share_network_id == share_network_id,
).all()
)
return result
@require_context
def share_instances_get_all_by_share_server(context, share_server_id,
with_share_data=False):
"""Returns list of share instance with given share server."""
session = get_session()
result = (
model_query(context, models.ShareInstance).filter(
models.ShareInstance.share_server_id == share_server_id,
).all()
)
if with_share_data:
result = _set_instances_share_data(context, result, session)
return result
@require_context
def share_instances_get_all_by_share(context, share_id):
"""Returns list of share instances that belong to given share."""
result = (
model_query(context, models.ShareInstance).filter(
models.ShareInstance.share_id == share_id,
).all()
)
return result
@require_context
def share_instances_get_all_by_share_group_id(context, share_group_id):
"""Returns list of share instances that belong to given share group."""
result = (
model_query(context, models.Share).filter(
models.Share.share_group_id == share_group_id,
).all()
)
instances = []
for share in result:
instance = share.instance
instance.set_share_data(share)
instances.append(instance)
return instances
################
def _share_replica_get_with_filters(context, share_id=None, replica_id=None,
replica_state=None, status=None,
with_share_server=True, session=None):
query = model_query(context, models.ShareInstance, session=session,
read_deleted="no")
if share_id is not None:
query = query.filter(models.ShareInstance.share_id == share_id)
if replica_id is not None:
query = query.filter(models.ShareInstance.id == replica_id)
if replica_state is not None:
query = query.filter(
models.ShareInstance.replica_state == replica_state)
else:
query = query.filter(models.ShareInstance.replica_state.isnot(None))
if status is not None:
query = query.filter(models.ShareInstance.status == status)
if with_share_server:
query = query.options(joinedload('share_server'))
return query
@require_context
def share_replicas_get_all(context, with_share_data=False,
with_share_server=True, session=None):
"""Returns replica instances for all available replicated shares."""
session = session or get_session()
result = _share_replica_get_with_filters(
context, with_share_server=with_share_server, session=session).all()
if with_share_data:
result = _set_instances_share_data(context, result, session)
return result
@require_context
def share_replicas_get_all_by_share(context, share_id,
with_share_data=False,
with_share_server=False, session=None):
"""Returns replica instances for a given share."""
session = session or get_session()
result = _share_replica_get_with_filters(
context, with_share_server=with_share_server,
share_id=share_id, session=session).all()
if with_share_data:
result = _set_instances_share_data(context, result, session)
return result
@require_context
def share_replicas_get_available_active_replica(context, share_id,
with_share_data=False,
with_share_server=False,
session=None):
"""Returns an 'active' replica instance that is 'available'."""
session = session or get_session()
result = _share_replica_get_with_filters(
context, with_share_server=with_share_server, share_id=share_id,
replica_state=constants.REPLICA_STATE_ACTIVE,
status=constants.STATUS_AVAILABLE, session=session).first()
if result and with_share_data:
result = _set_instances_share_data(context, result, session)[0]
return result
@require_context
def share_replica_get(context, replica_id, with_share_data=False,
with_share_server=False, session=None):
"""Returns summary of requested replica if available."""
session = session or get_session()
result = _share_replica_get_with_filters(
context, with_share_server=with_share_server,
replica_id=replica_id, session=session).first()
if result is None:
raise exception.ShareReplicaNotFound(replica_id=replica_id)
if with_share_data:
result = _set_instances_share_data(context, result, session)[0]
return result
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def share_replica_update(context, share_replica_id, values,
with_share_data=False, session=None):
"""Updates a share replica with specified values."""
session = session or get_session()
with session.begin():
_ensure_availability_zone_exists(context, values, session,
strict=False)
updated_share_replica = _share_instance_update(
context, share_replica_id, values, session=session)
if with_share_data:
updated_share_replica = _set_instances_share_data(
context, updated_share_replica, session)[0]
return updated_share_replica
@require_context
def share_replica_delete(context, share_replica_id, session=None,
need_to_update_usages=True):
"""Deletes a share replica."""
session = session or get_session()
share_instance_delete(context, share_replica_id, session=session,
need_to_update_usages=need_to_update_usages)
################
@require_context
def _share_get_query(context, session=None):
if session is None:
session = get_session()
return (model_query(context, models.Share, session=session).
options(joinedload('share_metadata')))
def _process_share_filters(query, filters, project_id=None, is_public=False):
if filters is None:
filters = {}
share_filter_keys = ['share_group_id', 'snapshot_id']
instance_filter_keys = ['share_server_id', 'status', 'share_type_id',
'host', 'share_network_id']
share_filters = {}
instance_filters = {}
for k, v in filters.items():
share_filters.update({k: v}) if k in share_filter_keys else None
instance_filters.update({k: v}) if k in instance_filter_keys else None
no_key = 'key_is_absent'
def _filter_data(query, model, desired_filters):
for key, value in desired_filters.items():
filter_attr = getattr(model, key, no_key)
if filter_attr == no_key:
pass
query = query.filter(filter_attr == value)
return query
if share_filters:
query = _filter_data(query, models.Share, share_filters)
if instance_filters:
query = _filter_data(query, models.ShareInstance, instance_filters)
if project_id:
if is_public:
query = query.filter(or_(models.Share.project_id == project_id,
models.Share.is_public))
else:
query = query.filter(models.Share.project_id == project_id)
display_name = filters.get('display_name')
if display_name:
query = query.filter(
models.Share.display_name == display_name)
else:
display_name = filters.get('display_name~')
if display_name:
query = query.filter(models.Share.display_name.op('LIKE')(
u'%' + display_name + u'%'))
display_description = filters.get('display_description')
if display_description:
query = query.filter(
models.Share.display_description == display_description)
else:
display_description = filters.get('display_description~')
if display_description:
query = query.filter(models.Share.display_description.op('LIKE')(
u'%' + display_description + u'%'))
export_location_id = filters.pop('export_location_id', None)
export_location_path = filters.pop('export_location_path', None)
if export_location_id or export_location_path:
query = query.join(
models.ShareInstanceExportLocations,
models.ShareInstanceExportLocations.share_instance_id ==
models.ShareInstance.id)
if export_location_path:
query = query.filter(
models.ShareInstanceExportLocations.path ==
export_location_path)
if export_location_id:
query = query.filter(
models.ShareInstanceExportLocations.uuid ==
export_location_id)
if 'metadata' in filters:
for k, v in filters['metadata'].items():
# pylint: disable=no-member
query = query.filter(
or_(models.Share.share_metadata.any(
key=k, value=v)))
if 'extra_specs' in filters:
query = query.join(
models.ShareTypeExtraSpecs,
models.ShareTypeExtraSpecs.share_type_id ==
models.ShareInstance.share_type_id)
for k, v in filters['extra_specs'].items():
query = query.filter(or_(models.ShareTypeExtraSpecs.key == k,
models.ShareTypeExtraSpecs.value == v))
return query
def _metadata_refs(metadata_dict, meta_class):
metadata_refs = []
if metadata_dict:
for k, v in metadata_dict.items():
value = six.text_type(v) if isinstance(v, bool) else v
metadata_ref = meta_class()
metadata_ref['key'] = k
metadata_ref['value'] = value
metadata_refs.append(metadata_ref)
return metadata_refs
@require_context
def share_create(context, share_values, create_share_instance=True):
values = copy.deepcopy(share_values)
values = ensure_model_dict_has_id(values)
values['share_metadata'] = _metadata_refs(values.get('metadata'),
models.ShareMetadata)
session = get_session()
share_ref = models.Share()
share_instance_values, share_values = _extract_share_instance_values(
values)
_ensure_availability_zone_exists(context, share_instance_values, session,
strict=False)
share_ref.update(share_values)
with session.begin():
share_ref.save(session=session)
if create_share_instance:
_share_instance_create(context, share_ref['id'],
share_instance_values, session=session)
# NOTE(u_glide): Do so to prevent errors with relationships
return share_get(context, share_ref['id'], session=session)
@require_admin_context
def share_data_get_for_project(context, project_id, user_id,
share_type_id=None, session=None):
query = (model_query(context, models.Share,
func.count(models.Share.id),
func.sum(models.Share.size),
read_deleted="no",
session=session).
filter_by(project_id=project_id))
if share_type_id:
query = query.join("instances").filter_by(share_type_id=share_type_id)
elif user_id:
query = query.filter_by(user_id=user_id)
result = query.first()
return (result[0] or 0, result[1] or 0)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def share_update(context, share_id, update_values):
session = get_session()
values = copy.deepcopy(update_values)
share_instance_values, share_values = _extract_share_instance_values(
values)
_ensure_availability_zone_exists(context, share_instance_values, session,
strict=False)
with session.begin():
share_ref = share_get(context, share_id, session=session)
_share_instance_update(context, share_ref.instance['id'],
share_instance_values, session=session)
share_ref.update(share_values)
share_ref.save(session=session)
return share_ref
@require_context
def share_get(context, share_id, session=None):
result = _share_get_query(context, session).filter_by(id=share_id).first()
if result is None:
raise exception.NotFound()
return result
def _share_get_all_with_filters(context, project_id=None, share_server_id=None,
share_group_id=None, filters=None,
is_public=False, sort_key=None,
sort_dir=None):
"""Returns sorted list of shares that satisfies filters.
:param context: context to query under
:param project_id: project id that owns shares
:param share_server_id: share server that hosts shares
:param filters: dict of filters to specify share selection
:param is_public: public shares from other projects will be added
to result if True
:param sort_key: key of models.Share to be used for sorting
:param sort_dir: desired direction of sorting, can be 'asc' and 'desc'
:returns: list -- models.Share
:raises: exception.InvalidInput
"""
if filters is None:
filters = {}
if not sort_key:
sort_key = 'created_at'
if not sort_dir:
sort_dir = 'desc'
query = (
_share_get_query(context).join(
models.ShareInstance,
models.ShareInstance.share_id == models.Share.id
)
)
if share_group_id:
filters['share_group_id'] = share_group_id
if share_server_id:
filters['share_server_id'] = share_server_id
query = _process_share_filters(
query, filters, project_id, is_public=is_public)
try:
query = apply_sorting(models.Share, query, sort_key, sort_dir)
except AttributeError:
try:
query = apply_sorting(
models.ShareInstance, query, sort_key, sort_dir)
except AttributeError:
msg = _("Wrong sorting key provided - '%s'.") % sort_key
raise exception.InvalidInput(reason=msg)
if 'limit' in filters:
offset = filters.get('offset', 0)
query = query.limit(filters['limit']).offset(offset)
# Returns list of shares that satisfy filters.
query = query.all()
return query
@require_admin_context
def share_get_all(context, filters=None, sort_key=None, sort_dir=None):
project_id = filters.pop('project_id', None) if filters else None
query = _share_get_all_with_filters(
context,
project_id=project_id,
filters=filters, sort_key=sort_key, sort_dir=sort_dir)
return query
@require_context
def share_get_all_by_project(context, project_id, filters=None,
is_public=False, sort_key=None, sort_dir=None):
"""Returns list of shares with given project ID."""
query = _share_get_all_with_filters(
context, project_id=project_id, filters=filters, is_public=is_public,
sort_key=sort_key, sort_dir=sort_dir,
)
return query
@require_context
def share_get_all_by_share_group_id(context, share_group_id,
filters=None, sort_key=None,
sort_dir=None):
"""Returns list of shares with given group ID."""
query = _share_get_all_with_filters(
context, share_group_id=share_group_id,
filters=filters, sort_key=sort_key, sort_dir=sort_dir,
)
return query
@require_context
def share_get_all_by_share_server(context, share_server_id, filters=None,
sort_key=None, sort_dir=None):
"""Returns list of shares with given share server."""
query = _share_get_all_with_filters(
context, share_server_id=share_server_id, filters=filters,
sort_key=sort_key, sort_dir=sort_dir,
)
return query
@require_context
def share_delete(context, share_id):
session = get_session()
with session.begin():
share_ref = share_get(context, share_id, session)
if len(share_ref.instances) > 0:
msg = _("Share %(id)s has %(count)s share instances.") % {
'id': share_id, 'count': len(share_ref.instances)}
raise exception.InvalidShare(msg)
share_ref.soft_delete(session=session)
(session.query(models.ShareMetadata).
filter_by(share_id=share_id).soft_delete())
###################
def _share_access_get_query(context, session, values, read_deleted='no'):
"""Get access record."""
query = (model_query(
context, models.ShareAccessMapping, session=session,
read_deleted=read_deleted).options(
joinedload('share_access_rules_metadata')))
return query.filter_by(**values)
def _share_instance_access_query(context, session, access_id=None,
instance_id=None):
filters = {'deleted': 'False'}
if access_id is not None:
filters.update({'access_id': access_id})
if instance_id is not None:
filters.update({'share_instance_id': instance_id})
return model_query(context, models.ShareInstanceAccessMapping,
session=session).filter_by(**filters)
def _share_access_metadata_get_item(context, access_id, key, session=None):
result = (_share_access_metadata_get_query(
context, access_id, session=session).filter_by(key=key).first())
if not result:
raise exception.ShareAccessMetadataNotFound(
metadata_key=key, access_id=access_id)
return result
def _share_access_metadata_get_query(context, access_id, session=None):
return (model_query(
context, models.ShareAccessRulesMetadata, session=session,
read_deleted="no").
filter_by(access_id=access_id).
options(joinedload('access')))
@require_context
def share_access_metadata_update(context, access_id, metadata):
session = get_session()
with session.begin():
# Now update all existing items with new values, or create new meta
# objects
for meta_key, meta_value in metadata.items():
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = _share_access_metadata_get_item(
context, access_id, meta_key, session=session)
except exception.ShareAccessMetadataNotFound:
meta_ref = models.ShareAccessRulesMetadata()
item.update({"key": meta_key, "access_id": access_id})
meta_ref.update(item)
meta_ref.save(session=session)
return metadata
@require_context
def share_access_metadata_delete(context, access_id, key):
session = get_session()
with session.begin():
metadata = _share_access_metadata_get_item(
context, access_id, key, session=session)
metadata.soft_delete(session)
@require_context
def share_access_create(context, values):
values = ensure_model_dict_has_id(values)
session = get_session()
with session.begin():
values['share_access_rules_metadata'] = (
_metadata_refs(values.get('metadata'),
models.ShareAccessRulesMetadata))
access_ref = models.ShareAccessMapping()
access_ref.update(values)
access_ref.save(session=session)
parent_share = share_get(context, values['share_id'], session=session)
for instance in parent_share.instances:
vals = {
'share_instance_id': instance['id'],
'access_id': access_ref['id'],
}
_share_instance_access_create(vals, session)
return share_access_get(context, access_ref['id'])
@require_context
def share_instance_access_create(context, values, share_instance_id):
values = ensure_model_dict_has_id(values)
session = get_session()
with session.begin():
access_list = _share_access_get_query(
context, session, {
'share_id': values['share_id'],
'access_type': values['access_type'],
'access_to': values['access_to'],
}).all()
if len(access_list) > 0:
access_ref = access_list[0]
else:
access_ref = models.ShareAccessMapping()
access_ref.update(values)
access_ref.save(session=session)
vals = {
'share_instance_id': share_instance_id,
'access_id': access_ref['id'],
}
_share_instance_access_create(vals, session)
return share_access_get(context, access_ref['id'])
@require_context
def share_instance_access_copy(context, share_id, instance_id, session=None):
"""Copy access rules from share to share instance."""
session = session or get_session()
share_access_rules = _share_access_get_query(
context, session, {'share_id': share_id}).all()
for access_rule in share_access_rules:
values = {
'share_instance_id': instance_id,
'access_id': access_rule['id'],
}
_share_instance_access_create(values, session)
return share_access_rules
def _share_instance_access_create(values, session):
access_ref = models.ShareInstanceAccessMapping()
access_ref.update(ensure_model_dict_has_id(values))
access_ref.save(session=session)
return access_ref
@require_context
def share_access_get(context, access_id, session=None):
"""Get access record."""
session = session or get_session()
access = _share_access_get_query(
context, session, {'id': access_id}).first()
if access:
return access
else:
raise exception.NotFound()
@require_context
def share_instance_access_get(context, access_id, instance_id,
with_share_access_data=True):
"""Get access record."""
session = get_session()
access = _share_instance_access_query(context, session, access_id,
instance_id).first()
if access is None:
raise exception.NotFound()
if with_share_access_data:
access = _set_instances_share_access_data(context, access, session)[0]
return access
@require_context
def share_access_get_all_for_share(context, share_id, filters=None,
session=None):
filters = filters or {}
session = session or get_session()
query = (_share_access_get_query(
context, session, {'share_id': share_id}).filter(
models.ShareAccessMapping.instance_mappings.any()))
if 'metadata' in filters:
for k, v in filters['metadata'].items():
query = query.filter(
or_(models.ShareAccessMapping.
share_access_rules_metadata.any(key=k, value=v)))
return query.all()
@require_context
def share_access_get_all_for_instance(context, instance_id, filters=None,
with_share_access_data=True,
session=None):
"""Get all access rules related to a certain share instance."""
session = session or get_session()
filters = copy.deepcopy(filters) if filters else {}
filters.update({'share_instance_id': instance_id})
legal_filter_keys = ('id', 'share_instance_id', 'access_id', 'state')
query = _share_instance_access_query(context, session)
query = exact_filter(
query, models.ShareInstanceAccessMapping, filters, legal_filter_keys)
instance_accesses = query.all()
if with_share_access_data:
instance_accesses = _set_instances_share_access_data(
context, instance_accesses, session)
return instance_accesses
def _set_instances_share_access_data(context, instance_accesses, session):
if instance_accesses and not isinstance(instance_accesses, list):
instance_accesses = [instance_accesses]
for instance_access in instance_accesses:
share_access = share_access_get(
context, instance_access['access_id'], session=session)
instance_access.set_share_access_data(share_access)
return instance_accesses
def _set_instances_snapshot_access_data(context, instance_accesses, session):
if instance_accesses and not isinstance(instance_accesses, list):
instance_accesses = [instance_accesses]
for instance_access in instance_accesses:
snapshot_access = share_snapshot_access_get(
context, instance_access['access_id'], session=session)
instance_access.set_snapshot_access_data(snapshot_access)
return instance_accesses
@require_context
def share_access_get_all_by_type_and_access(context, share_id, access_type,
access):
session = get_session()
return _share_access_get_query(context, session,
{'share_id': share_id,
'access_type': access_type,
'access_to': access}).all()
@require_context
def share_access_check_for_existing_access(context, share_id, access_type,
access_to):
return _check_for_existing_access(
context, 'share', share_id, access_type, access_to)
def _check_for_existing_access(context, resource, resource_id, access_type,
access_to):
session = get_session()
if resource == 'share':
query_method = _share_access_get_query
access_to_field = models.ShareAccessMapping.access_to
else:
query_method = _share_snapshot_access_get_query
access_to_field = models.ShareSnapshotAccessMapping.access_to
with session.begin():
if access_type == 'ip':
rules = query_method(
context, session, {'%s_id' % resource: resource_id,
'access_type': access_type}).filter(
access_to_field.startswith(access_to.split('/')[0])).all()
matching_rules = [
rule for rule in rules if
ipaddress.ip_network(six.text_type(access_to)) ==
ipaddress.ip_network(six.text_type(rule['access_to']))
]
return len(matching_rules) > 0
else:
return query_method(
context, session, {'%s_id' % resource: resource_id,
'access_type': access_type,
'access_to': access_to}).count() > 0
@require_context
def share_access_delete_all_by_share(context, share_id):
session = get_session()
with session.begin():
(session.query(models.ShareAccessMapping).
filter_by(share_id=share_id).soft_delete())
@require_context
def share_instance_access_delete(context, mapping_id):