OpenStack Block Storage (Cinder)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

7324 lines
265 KiB

# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2014 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import collections
from collections import abc
import contextlib
import datetime as dt
import functools
import itertools
import re
import sys
import uuid
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import options
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
osprofiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
import sqlalchemy
from sqlalchemy import MetaData
from sqlalchemy import or_, and_, case
from sqlalchemy.orm import joinedload, undefer_group, load_only
from sqlalchemy.orm import RelationshipProperty
from sqlalchemy import sql
from sqlalchemy.sql.expression import bindparam
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import true
from sqlalchemy.sql import func
from sqlalchemy.sql import sqltypes
from cinder.api import common
from cinder.common import sqlalchemyutils
from cinder import db
from cinder.db.sqlalchemy import models
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import fields
from cinder import utils
from cinder.volume import volume_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
options.set_defaults(CONF, connection='sqlite:///$state_path/cinder.sqlite')
main_context_manager = enginefacade.transaction_context()
def configure(conf):
main_context_manager.configure(**dict(conf.database))
# NOTE(geguileo): To avoid a cyclical dependency we import the
# group here. Dependency cycle is objects.base requires db.api,
# which requires db.sqlalchemy.api, which requires service which
# requires objects.base
CONF.import_group("profiler", "cinder.service")
if CONF.profiler.enabled:
if CONF.profiler.trace_sqlalchemy:
lambda eng: osprofiler_sqlalchemy.add_tracing(sqlalchemy,
eng, "db")
def get_engine(use_slave=False):
return main_context_manager._factory.get_legacy_facade().get_engine(
use_slave=use_slave)
def get_session(use_slave=False, **kwargs):
return main_context_manager._factory.get_legacy_facade().get_session(
use_slave=use_slave, **kwargs)
def dispose_engine():
get_engine().dispose()
_DEFAULT_QUOTA_NAME = 'default'
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def is_admin_context(context):
"""Indicates if the request context is an administrator."""
if not context:
raise exception.CinderException(
'Use of empty request context is deprecated')
return context.is_admin
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True
def authorize_project_context(context, project_id):
"""Ensures a request has permission to access the given project."""
if is_user_context(context):
if not context.project_id:
raise exception.NotAuthorized()
elif context.project_id != project_id:
raise exception.NotAuthorized()
def authorize_user_context(context, user_id):
"""Ensures a request has permission to access the given user."""
if is_user_context(context):
if not context.user_id:
raise exception.NotAuthorized()
elif context.user_id != user_id:
raise exception.NotAuthorized()
def authorize_quota_class_context(context, class_name):
"""Ensures a request has permission to access the given quota class."""
if is_user_context(context):
if not context.quota_class:
raise exception.NotAuthorized()
elif context.quota_class != class_name:
raise exception.NotAuthorized()
def require_admin_context(f):
"""Decorator to require admin request context.
The first argument to the wrapped function must be the context.
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]):
raise exception.AdminRequired()
return f(*args, **kwargs)
return wrapper
def require_context(f):
"""Decorator to require *any* user or admin context.
This does no authorization for user or project access matching, see
:py:func:`authorize_project_context` and
:py:func:`authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]) and not is_user_context(args[0]):
raise exception.NotAuthorized()
return f(*args, **kwargs)
return wrapper
def require_volume_exists(f):
"""Decorator to require the specified volume to exist.
Requires the wrapped function to use context and volume_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, volume_id, *args, **kwargs):
if not resource_exists(context, models.Volume, volume_id):
raise exception.VolumeNotFound(volume_id=volume_id)
return f(context, volume_id, *args, **kwargs)
return wrapper
def require_snapshot_exists(f):
"""Decorator to require the specified snapshot to exist.
Requires the wrapped function to use context and snapshot_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, snapshot_id, *args, **kwargs):
if not resource_exists(context, models.Snapshot, snapshot_id):
raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
return f(context, snapshot_id, *args, **kwargs)
return wrapper
def require_backup_exists(f):
"""Decorator to require the specified snapshot to exist.
Requires the wrapped function to use context and backup_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, backup_id, *args, **kwargs):
if not resource_exists(context, models.Backup, backup_id):
raise exception.BackupNotFound(backup_id=backup_id)
return f(context, backup_id, *args, **kwargs)
return wrapper
def handle_db_data_error(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except db_exc.DBDataError:
msg = _('Error writing field to database')
LOG.exception(msg)
raise exception.Invalid(msg)
return wrapper
def model_query(context, model, *args, **kwargs):
"""Query helper that accounts for context's `read_deleted` field.
:param context: context to query under
:param model: Model to query. Must be a subclass of ModelBase.
:param args: Arguments to query. If None - model is used.
:param read_deleted: if present, overrides context's read_deleted field.
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
"""
session = kwargs.get('session') or get_session()
read_deleted = kwargs.get('read_deleted') or context.read_deleted
project_only = kwargs.get('project_only')
query = session.query(model, *args)
if read_deleted == 'no':
query = query.filter_by(deleted=False)
elif read_deleted == 'yes':
pass # omit the filter to include deleted and active
elif read_deleted == 'only':
query = query.filter_by(deleted=True)
elif read_deleted == 'int_no':
query = query.filter_by(deleted=0)
else:
raise Exception(
_("Unrecognized read_deleted value '%s'") % read_deleted)
if project_only and is_user_context(context):
if model is models.VolumeAttachment:
# NOTE(dulek): In case of VolumeAttachment, we need to join
# `project_id` through `volume` relationship.
query = query.filter(models.Volume.project_id ==
context.project_id)
else:
query = query.filter_by(project_id=context.project_id)
return query
def _sync_volumes(context, project_id, session, volume_type_id=None,
volume_type_name=None):
(volumes, _gigs) = _volume_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'volumes'
if volume_type_name:
key += '_' + volume_type_name
return {key: volumes}
def _sync_snapshots(context, project_id, session, volume_type_id=None,
volume_type_name=None):
(snapshots, _gigs) = _snapshot_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'snapshots'
if volume_type_name:
key += '_' + volume_type_name
return {key: snapshots}
def _sync_backups(context, project_id, session, volume_type_id=None,
volume_type_name=None):
(backups, _gigs) = _backup_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'backups'
return {key: backups}
def _sync_gigabytes(context, project_id, session, volume_type_id=None,
volume_type_name=None):
(_junk, vol_gigs) = _volume_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
key = 'gigabytes'
if volume_type_name:
key += '_' + volume_type_name
if CONF.no_snapshot_gb_quota:
return {key: vol_gigs}
(_junk, snap_gigs) = _snapshot_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
return {key: vol_gigs + snap_gigs}
def _sync_consistencygroups(context, project_id, session,
volume_type_id=None,
volume_type_name=None):
(_junk, groups) = _consistencygroup_data_get_for_project(
context, project_id, session=session)
key = 'consistencygroups'
return {key: groups}
def _sync_groups(context, project_id, session,
volume_type_id=None,
volume_type_name=None):
(_junk, groups) = _group_data_get_for_project(
context, project_id, session=session)
key = 'groups'
return {key: groups}
def _sync_backup_gigabytes(context, project_id, session, volume_type_id=None,
volume_type_name=None):
key = 'backup_gigabytes'
(_junk, backup_gigs) = _backup_data_get_for_project(
context, project_id, volume_type_id=volume_type_id, session=session)
return {key: backup_gigs}
QUOTA_SYNC_FUNCTIONS = {
'_sync_volumes': _sync_volumes,
'_sync_snapshots': _sync_snapshots,
'_sync_gigabytes': _sync_gigabytes,
'_sync_consistencygroups': _sync_consistencygroups,
'_sync_backups': _sync_backups,
'_sync_backup_gigabytes': _sync_backup_gigabytes,
'_sync_groups': _sync_groups,
}
###################
def _clean_filters(filters):
return {k: v for k, v in filters.items() if v is not None}
def _filter_host(field, value, match_level=None):
"""Generate a filter condition for host and cluster fields.
Levels are:
- 'pool': Will search for an exact match
- 'backend': Will search for exact match and value#*
- 'host'; Will search for exact match, value@* and value#*
If no level is provided we'll determine it based on the value we want to
match:
- 'pool': If '#' is present in value
- 'backend': If '@' is present in value and '#' is not present
- 'host': In any other case
:param field: ORM field. Ex: objects.Volume.model.host
:param value: String to compare with
:param match_level: 'pool', 'backend', or 'host'
"""
# If we don't set level we'll try to determine it automatically. LIKE
# operations are expensive, so we try to reduce them to the minimum.
if match_level is None:
if '#' in value:
match_level = 'pool'
elif '@' in value:
match_level = 'backend'
else:
match_level = 'host'
# Mysql is not doing case sensitive filtering, so we force it
conn_str = CONF.database.connection
if conn_str.startswith('mysql') and conn_str[5] in ['+', ':']:
cmp_value = func.binary(value)
like_op = 'LIKE BINARY'
else:
cmp_value = value
like_op = 'LIKE'
conditions = [field == cmp_value]
if match_level != 'pool':
conditions.append(field.op(like_op)(value + '#%'))
if match_level == 'host':
conditions.append(field.op(like_op)(value + '@%'))
return or_(*conditions)
def _filter_time_comparison(field, time_filter_dict):
"""Generate a filter condition for time comparison operators"""
conditions = []
for operator in time_filter_dict:
filter_time = timeutils.normalize_time(time_filter_dict[operator])
if operator == 'gt':
conditions.append(field.op('>')(filter_time))
elif operator == 'gte':
conditions.append(field.op('>=')(filter_time))
if operator == 'eq':
conditions.append(field.op('=')(filter_time))
elif operator == 'neq':
conditions.append(field.op('!=')(filter_time))
if operator == 'lt':
conditions.append(field.op('<')(filter_time))
elif operator == 'lte':
conditions.append(field.op('<=')(filter_time))
return or_(*conditions)
def _clustered_bool_field_filter(query, field_name, filter_value):
# Now that we have clusters, a service is disabled/frozen if the service
# doesn't belong to a cluster or if it belongs to a cluster and the cluster
# itself is disabled/frozen.
if filter_value is not None:
query_filter = or_(
and_(models.Service.cluster_name.is_(None),
getattr(models.Service, field_name)),
and_(models.Service.cluster_name.isnot(None),
sql.exists().where(and_(
models.Cluster.name == models.Service.cluster_name,
models.Cluster.binary == models.Service.binary,
~models.Cluster.deleted,
getattr(models.Cluster, field_name)))))
if not filter_value:
query_filter = ~query_filter
query = query.filter(query_filter)
return query
def _service_query(context, session=None, read_deleted='no', host=None,
cluster_name=None, is_up=None, host_or_cluster=None,
backend_match_level=None, disabled=None, frozen=None,
**filters):
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Service, filters):
return None
query = model_query(context, models.Service, session=session,
read_deleted=read_deleted)
# Host and cluster are particular cases of filters, because we must
# retrieve not only exact matches (single backend configuration), but also
# match those that have the backend defined (multi backend configuration).
if host:
query = query.filter(_filter_host(models.Service.host, host,
backend_match_level))
if cluster_name:
query = query.filter(_filter_host(models.Service.cluster_name,
cluster_name, backend_match_level))
if host_or_cluster:
query = query.filter(or_(
_filter_host(models.Service.host, host_or_cluster,
backend_match_level),
_filter_host(models.Service.cluster_name, host_or_cluster,
backend_match_level),
))
query = _clustered_bool_field_filter(query, 'disabled', disabled)
query = _clustered_bool_field_filter(query, 'frozen', frozen)
if filters:
query = query.filter_by(**filters)
if is_up is not None:
date_limit = utils.service_expired_time()
svc = models.Service
filter_ = or_(
and_(svc.created_at.isnot(None), svc.created_at >= date_limit),
and_(svc.updated_at.isnot(None), svc.updated_at >= date_limit))
query = query.filter(filter_ == is_up)
return query
@require_admin_context
def service_destroy(context, service_id):
query = _service_query(context, id=service_id)
updated_values = models.Service.delete_values()
if not query.update(updated_values):
raise exception.ServiceNotFound(service_id=service_id)
return updated_values
@require_admin_context
def service_get(context, service_id=None, backend_match_level=None, **filters):
"""Get a service that matches the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param service_id: Id of the service.
:param filters: Filters for the query in the form of key/value.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
:raise ServiceNotFound: If service doesn't exist.
"""
query = _service_query(context, backend_match_level=backend_match_level,
id=service_id, **filters)
service = None if not query else query.first()
if not service:
serv_id = service_id or filters.get('topic') or filters.get('binary')
raise exception.ServiceNotFound(service_id=serv_id,
host=filters.get('host'))
return service
@require_admin_context
def service_get_all(context, backend_match_level=None, **filters):
"""Get all services that match the criteria.
A possible filter is is_up=True and it will filter nodes that are down.
:param filters: Filters for the query in the form of key/value.
:param backend_match_level: 'pool', 'backend', or 'host' for host and
cluster filters (as defined in _filter_host
method)
"""
query = _service_query(context, backend_match_level=backend_match_level,
**filters)
return [] if not query else query.all()
@require_admin_context
def service_get_by_uuid(context, service_uuid):
query = model_query(context, models.Service).filter_by(uuid=service_uuid)
result = query.first()
if not result:
raise exception.ServiceNotFound(service_id=service_uuid)
return result
@require_admin_context
def service_create(context, values):
service_ref = models.Service()
service_ref.update(values)
if not CONF.enable_new_services:
service_ref.disabled = True
session = get_session()
with session.begin():
service_ref.save(session)
return service_ref
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def service_update(context, service_id, values):
query = _service_query(context, id=service_id)
if 'disabled' in values:
entity = query.column_descriptions[0]['entity']
values = values.copy()
values['modified_at'] = values.get('modified_at', timeutils.utcnow())
values['updated_at'] = values.get('updated_at', entity.updated_at)
result = query.update(values)
if not result:
raise exception.ServiceNotFound(service_id=service_id)
###################
@require_admin_context
def is_backend_frozen(context, host, cluster_name):
"""Check if a storage backend is frozen based on host and cluster_name."""
if cluster_name:
model = models.Cluster
conditions = [model.name == volume_utils.extract_host(cluster_name)]
else:
model = models.Service
conditions = [model.host == volume_utils.extract_host(host)]
conditions.extend((~model.deleted, model.frozen))
query = get_session().query(sql.exists().where(and_(*conditions)))
frozen = query.scalar()
return frozen
###################
def _cluster_query(context, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, name=None, session=None, **filters):
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Cluster, filters):
return None
query = model_query(context, models.Cluster, session=session,
read_deleted=read_deleted)
# Cluster is a special case of filter, because we must match exact match
# as well as hosts that specify the backend
if name:
query = query.filter(_filter_host(models.Cluster.name, name,
name_match_level))
if filters:
query = query.filter_by(**filters)
if services_summary:
query = query.options(undefer_group('services_summary'))
# We bind the expiration time to now (as it changes with each query)
# and is required by num_down_hosts
query = query.params(expired=utils.service_expired_time())
elif 'num_down_hosts' in filters:
query = query.params(expired=utils.service_expired_time())
if get_services:
query = query.options(joinedload('services'))
if is_up is not None:
date_limit = utils.service_expired_time()
filter_ = and_(models.Cluster.last_heartbeat.isnot(None),
models.Cluster.last_heartbeat >= date_limit)
query = query.filter(filter_ == is_up)
return query
@require_admin_context
def cluster_get(context, id=None, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get a cluster that matches the criteria.
:param id: Id of the cluster.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param filters: Field based filters in the form of key/value.
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:raise ClusterNotFound: If cluster doesn't exist.
"""
query = _cluster_query(context, is_up, get_services, services_summary,
read_deleted, name_match_level, id=id, **filters)
cluster = None if not query else query.first()
if not cluster:
cluster_id = id or str(filters)
raise exception.ClusterNotFound(id=cluster_id)
return cluster
@require_admin_context
def cluster_get_all(context, is_up=None, get_services=False,
services_summary=False, read_deleted='no',
name_match_level=None, **filters):
"""Get all clusters that match the criteria.
:param is_up: Boolean value to filter based on the cluster's up status.
:param get_services: If we want to load all services from this cluster.
:param services_summary: If we want to load num_hosts and
num_down_hosts fields.
:param read_deleted: Filtering based on delete status. Default value is
"no".
:param name_match_level: 'pool', 'backend', or 'host' for name filter (as
defined in _filter_host method)
:param filters: Field based filters in the form of key/value.
"""
query = _cluster_query(context, is_up, get_services, services_summary,
read_deleted, name_match_level, **filters)
return [] if not query else query.all()
@require_admin_context
def cluster_create(context, values):
"""Create a cluster from the values dictionary."""
cluster_ref = models.Cluster()
cluster_ref.update(values)
# Provided disabled value takes precedence
if values.get('disabled') is None:
cluster_ref.disabled = not CONF.enable_new_services
session = get_session()
try:
with session.begin():
cluster_ref.save(session)
# We mark that newly created cluster has no hosts to prevent
# problems at the OVO level
cluster_ref.last_heartbeat = None
return cluster_ref
# If we had a race condition (another non deleted cluster exists with the
# same name) raise Duplicate exception.
except db_exc.DBDuplicateEntry:
raise exception.ClusterExists(name=values.get('name'))
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def cluster_update(context, id, values):
"""Set the given properties on an cluster and update it.
Raises ClusterNotFound if cluster does not exist.
"""
query = _cluster_query(context, id=id)
result = query.update(values)
if not result:
raise exception.ClusterNotFound(id=id)
@require_admin_context
def cluster_destroy(context, id):
"""Destroy the cluster or raise if it does not exist or has hosts."""
query = _cluster_query(context, id=id)
query = query.filter(models.Cluster.num_hosts == 0)
# If the update doesn't succeed we don't know if it's because the
# cluster doesn't exist or because it has hosts.
result = query.update(models.Cluster.delete_values(),
synchronize_session=False)
if not result:
# This will fail if the cluster doesn't exist raising the right
# exception
cluster_get(context, id=id)
# If it doesn't fail, then the problem is that there are hosts
raise exception.ClusterHasHosts(id=id)
###################
def _metadata_refs(metadata_dict, meta_class):
metadata_refs = []
if metadata_dict:
for k, v in metadata_dict.items():
metadata_ref = meta_class()
metadata_ref['key'] = k
metadata_ref['value'] = v
metadata_refs.append(metadata_ref)
return metadata_refs
def _dict_with_extra_specs_if_authorized(context, inst_type_query):
"""Convert type query result to dict with extra_spec and rate_limit.
Takes a volume type query returned by sqlalchemy and returns it
as a dictionary, converting the extra_specs entry from a list
of dicts. NOTE the contents of extra-specs are admin readable
only. If the context passed in for this request is not admin
then we will return an empty extra-specs dict rather than
providing the admin only details.
Example response with admin context:
'extra_specs' : [{'key': 'k1', 'value': 'v1', ...}, ...]
to a single dict:
'extra_specs' : {'k1': 'v1'}
"""
inst_type_dict = dict(inst_type_query)
extra_specs = {x['key']: x['value']
for x in inst_type_query['extra_specs']}
inst_type_dict['extra_specs'] = extra_specs
return inst_type_dict
###################
def _dict_with_group_specs_if_authorized(context, inst_type_query):
"""Convert group type query result to dict with spec and rate_limit.
Takes a group type query returned by sqlalchemy and returns it
as a dictionary, converting the extra_specs entry from a list
of dicts. NOTE the contents of extra-specs are admin readable
only. If the context passed in for this request is not admin
then we will return an empty extra-specs dict rather than
providing the admin only details.
Example response with admin context:
'group_specs' : [{'key': 'k1', 'value': 'v1', ...}, ...]
to a single dict:
'group_specs' : {'k1': 'v1'}
"""
inst_type_dict = dict(inst_type_query)
if not is_admin_context(context):
del(inst_type_dict['group_specs'])
else:
group_specs = {x['key']: x['value']
for x in inst_type_query['group_specs']}
inst_type_dict['group_specs'] = group_specs
return inst_type_dict
###################
@require_context
def _quota_get(context, project_id, resource, session=None):
result = model_query(context, models.Quota, session=session,
read_deleted="no").\
filter_by(project_id=project_id).\
filter_by(resource=resource).\
first()
if not result:
raise exception.ProjectQuotaNotFound(project_id=project_id)
return result
@require_context
def quota_get(context, project_id, resource):
return _quota_get(context, project_id, resource)
@require_context
def quota_get_all_by_project(context, project_id):
rows = model_query(context, models.Quota, read_deleted="no").\
filter_by(project_id=project_id).\
all()
result = {'project_id': project_id}
for row in rows:
result[row.resource] = row.hard_limit
return result
@require_context
def _quota_get_all_by_resource(context, resource, session=None):
rows = model_query(context, models.Quota,
session=session,
read_deleted='no').filter_by(
resource=resource).all()
return rows
@require_context
def quota_create(context, project_id, resource, limit):
quota_ref = models.Quota()
quota_ref.project_id = project_id
quota_ref.resource = resource
quota_ref.hard_limit = limit
session = get_session()
with session.begin():
quota_ref.save(session)
return quota_ref
@require_context
def quota_update(context, project_id, resource, limit):
session = get_session()
with session.begin():
quota_ref = _quota_get(context, project_id, resource, session=session)
quota_ref.hard_limit = limit
return quota_ref
@require_context
def quota_update_resource(context, old_res, new_res):
session = get_session()
with session.begin():
quotas = _quota_get_all_by_resource(context, old_res, session=session)
for quota in quotas:
quota.resource = new_res
@require_admin_context
def quota_destroy(context, project_id, resource):
session = get_session()
with session.begin():
quota_ref = _quota_get(context, project_id, resource, session=session)
return quota_ref.delete(session=session)
###################
@require_context
def _quota_class_get(context, class_name, resource, session=None):
result = model_query(context, models.QuotaClass, session=session,
read_deleted="no").\
filter_by(class_name=class_name).\
filter_by(resource=resource).\
first()
if not result:
raise exception.QuotaClassNotFound(class_name=class_name)
return result
@require_context
def quota_class_get(context, class_name, resource):
return _quota_class_get(context, class_name, resource)
def quota_class_get_defaults(context):
rows = model_query(context, models.QuotaClass,
read_deleted="no").\
filter_by(class_name=_DEFAULT_QUOTA_NAME).all()
result = {'class_name': _DEFAULT_QUOTA_NAME}
for row in rows:
result[row.resource] = row.hard_limit
return result
@require_context
def quota_class_get_all_by_name(context, class_name):
rows = model_query(context, models.QuotaClass, read_deleted="no").\
filter_by(class_name=class_name).\
all()
result = {'class_name': class_name}
for row in rows:
result[row.resource] = row.hard_limit
return result
@require_context
def _quota_class_get_all_by_resource(context, resource, session):
result = model_query(context, models.QuotaClass,
session=session,
read_deleted="no").\
filter_by(resource=resource).\
all()
return result
@handle_db_data_error
@require_context
def quota_class_create(context, class_name, resource, limit):
quota_class_ref = models.QuotaClass()
quota_class_ref.class_name = class_name
quota_class_ref.resource = resource
quota_class_ref.hard_limit = limit
session = get_session()
with session.begin():
quota_class_ref.save(session)
return quota_class_ref
@require_context
def quota_class_update(context, class_name, resource, limit):
session = get_session()
with session.begin():
quota_class_ref = _quota_class_get(context, class_name, resource,
session=session)
quota_class_ref.hard_limit = limit
return quota_class_ref
@require_context
def quota_class_update_resource(context, old_res, new_res):
session = get_session()
with session.begin():
quota_class_list = _quota_class_get_all_by_resource(
context, old_res, session)
for quota_class in quota_class_list:
quota_class.resource = new_res
@require_context
def quota_class_destroy(context, class_name, resource):
session = get_session()
with session.begin():
quota_class_ref = _quota_class_get(context, class_name, resource,
session=session)
return quota_class_ref.delete(session=session)
@require_context
def quota_class_destroy_all_by_name(context, class_name):
session = get_session()
with session.begin():
quota_classes = model_query(context, models.QuotaClass,
session=session, read_deleted="no").\
filter_by(class_name=class_name).\
all()
for quota_class_ref in quota_classes:
quota_class_ref.delete(session=session)
###################
@require_context
def quota_usage_get(context, project_id, resource):
result = model_query(context, models.QuotaUsage, read_deleted="no").\
filter_by(project_id=project_id).\
filter_by(resource=resource).\
first()
if not result:
raise exception.QuotaUsageNotFound(project_id=project_id)
return result
@require_context
def quota_usage_get_all_by_project(context, project_id):
rows = model_query(context, models.QuotaUsage, read_deleted="no").\
filter_by(project_id=project_id).\
all()
result = {'project_id': project_id}
for row in rows:
result[row.resource] = dict(in_use=row.in_use, reserved=row.reserved)
return result
@require_admin_context
def _quota_usage_create(context, project_id, resource, in_use, reserved,
until_refresh, session=None):
quota_usage_ref = models.QuotaUsage()
quota_usage_ref.project_id = project_id
quota_usage_ref.resource = resource
quota_usage_ref.in_use = in_use
quota_usage_ref.reserved = reserved
quota_usage_ref.until_refresh = until_refresh
quota_usage_ref.save(session=session)
return quota_usage_ref
###################
def _reservation_create(context, uuid, usage, project_id, resource, delta,
expire, session=None):
usage_id = usage['id'] if usage else None
reservation_ref = models.Reservation()
reservation_ref.uuid = uuid
reservation_ref.usage_id = usage_id
reservation_ref.project_id = project_id
reservation_ref.resource = resource
reservation_ref.delta = delta
reservation_ref.expire = expire
reservation_ref.save(session=session)
return reservation_ref
###################
# NOTE(johannes): The quota code uses SQL locking to ensure races don't
# cause under or over counting of resources. To avoid deadlocks, this
# code always acquires the lock on quota_usages before acquiring the lock
# on reservations.
def _get_quota_usages(context, session, project_id, resources=None):
# Broken out for testability
query = model_query(context, models.QuotaUsage,
read_deleted="no",
session=session).filter_by(project_id=project_id)
if resources:
query = query.filter(models.QuotaUsage.resource.in_(list(resources)))
rows = query.order_by(models.QuotaUsage.id.asc()).\
with_for_update().all()
return {row.resource: row for row in rows}
def _get_quota_usages_by_resource(context, session, resource):
rows = model_query(context, models.QuotaUsage,
deleted="no",
session=session).\
filter_by(resource=resource).\
order_by(models.QuotaUsage.id.asc()).\
with_for_update().\
all()
return rows
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_usage_update_resource(context, old_res, new_res):
session = get_session()
with session.begin():
usages = _get_quota_usages_by_resource(context, session, old_res)
for usage in usages:
usage.resource = new_res
usage.until_refresh = 1
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_reserve(context, resources, quotas, deltas, expire,
until_refresh, max_age, project_id=None):
elevated = context.elevated()
session = get_session()
with session.begin():
if project_id is None:
project_id = context.project_id
# Get the current usages
usages = _get_quota_usages(context, session, project_id,
resources=deltas.keys())
# Handle usage refresh
work = set(deltas.keys())
while work:
resource = work.pop()
# Do we need to refresh the usage?
refresh = False
if resource not in usages:
usages[resource] = _quota_usage_create(elevated,
project_id,
resource,
0, 0,
until_refresh or None,
session=session)
refresh = True
elif usages[resource].in_use < 0:
# Negative in_use count indicates a desync, so try to
# heal from that...
refresh = True
elif usages[resource].until_refresh is not None:
usages[resource].until_refresh -= 1
if usages[resource].until_refresh <= 0:
refresh = True
elif max_age and usages[resource].updated_at is not None and (
(timeutils.utcnow() -
usages[resource].updated_at).total_seconds() >= max_age):
refresh = True
# OK, refresh the usage
if refresh:
# Grab the sync routine
sync = QUOTA_SYNC_FUNCTIONS[resources[resource].sync]
volume_type_id = getattr(resources[resource],
'volume_type_id', None)
volume_type_name = getattr(resources[resource],
'volume_type_name', None)
updates = sync(elevated, project_id,
volume_type_id=volume_type_id,
volume_type_name=volume_type_name,
session=session)
for res, in_use in updates.items():
# Make sure we have a destination for the usage!
if res not in usages:
usages[res] = _quota_usage_create(
elevated,
project_id,
res,
0, 0,
until_refresh or None,
session=session
)
# Update the usage
usages[res].in_use = in_use
usages[res].until_refresh = until_refresh or None
# Because more than one resource may be refreshed
# by the call to the sync routine, and we don't
# want to double-sync, we make sure all refreshed
# resources are dropped from the work set.
work.discard(res)
# NOTE(Vek): We make the assumption that the sync
# routine actually refreshes the
# resources that it is the sync routine
# for. We don't check, because this is
# a best-effort mechanism.
# There are 3 cases where we want to update "until_refresh" in the
# DB: when we enabled it, when we disabled it, and when we changed
# to a value lower than the current remaining value.
else:
res_until = usages[resource].until_refresh
if ((res_until is None and until_refresh) or
((res_until or 0) > (until_refresh or 0))):
usages[resource].until_refresh = until_refresh or None
# Check for deltas that would go negative
unders = [r for r, delta in deltas.items()
if delta < 0 and delta + usages[r].in_use < 0]
# TODO(mc_nair): Should ignore/zero alloc if using non-nested driver
# Now, let's check the quotas
# NOTE(Vek): We're only concerned about positive increments.
# If a project has gone over quota, we want them to
# be able to reduce their usage without any
# problems.
overs = [r for r, delta in deltas.items()
if quotas[r] >= 0 and delta >= 0 and
quotas[r] < delta + usages[r].total]
# NOTE(Vek): The quota check needs to be in the transaction,
# but the transaction doesn't fail just because
# we're over quota, so the OverQuota raise is
# outside the transaction. If we did the raise
# here, our usage updates would be discarded, but
# they're not invalidated by being over-quota.
# Create the reservations
if not overs:
reservations = []
for resource, delta in deltas.items():
usage = usages[resource]
reservation = _reservation_create(
elevated, str(uuid.uuid4()), usage, project_id, resource,
delta, expire, session=session)
reservations.append(reservation.uuid)
# Also update the reserved quantity
# NOTE(Vek): Again, we are only concerned here about
# positive increments. Here, though, we're
# worried about the following scenario:
#
# 1) User initiates resize down.
# 2) User allocates a new instance.
# 3) Resize down fails or is reverted.
# 4) User is now over quota.
#
# To prevent this, we only update the
# reserved value if the delta is positive.
if delta > 0:
usages[resource].reserved += delta
if unders:
LOG.warning("Change will make usage less than 0 for the following "
"resources: %s", unders)
if overs:
usages = {k: dict(in_use=v.in_use, reserved=v.reserved)
for k, v in usages.items()}
raise exception.OverQuota(overs=sorted(overs), quotas=quotas,
usages=usages)
return reservations
def _quota_reservations(session, context, reservations):
"""Return the relevant reservations."""
# Get the listed reservations
return model_query(context, models.Reservation,
read_deleted="no",
session=session).\
filter(models.Reservation.uuid.in_(reservations)).\
with_for_update().\
all()
def _get_reservation_resources(session, context, reservation_ids):
"""Return the relevant resources by reservations."""
reservations = model_query(context, models.Reservation,
read_deleted="no",
session=session).\
options(load_only('resource')).\
filter(models.Reservation.uuid.in_(reservation_ids)).\
all()
return {r.resource for r in reservations}
def _dict_with_usage_id(usages):
return {row.id: row for row in usages.values()}
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_commit(context, reservations, project_id=None):
session = get_session()
with session.begin():
usages = _get_quota_usages(
context, session, project_id,
resources=_get_reservation_resources(session, context,
reservations))
usages = _dict_with_usage_id(usages)
for reservation in _quota_reservations(session, context, reservations):
usage = usages[reservation.usage_id]
if reservation.delta >= 0:
usage.reserved -= reservation.delta
usage.in_use += reservation.delta
reservation.delete(session=session)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_rollback(context, reservations, project_id=None):
session = get_session()
with session.begin():
usages = _get_quota_usages(
context, session, project_id,
resources=_get_reservation_resources(session, context,
reservations))
usages = _dict_with_usage_id(usages)
for reservation in _quota_reservations(session, context, reservations):
usage = usages[reservation.usage_id]
if reservation.delta >= 0:
usage.reserved -= reservation.delta
reservation.delete(session=session)
def quota_destroy_by_project(*args, **kwargs):
"""Destroy all limit quotas associated with a project.
Leaves usage and reservation quotas intact.
"""
quota_destroy_all_by_project(only_quotas=True, *args, **kwargs)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def quota_destroy_all_by_project(context, project_id, only_quotas=False):
"""Destroy all quotas associated with a project.
This includes limit quotas, usage quotas and reservation quotas.
Optionally can only remove limit quotas and leave other types as they are.
:param context: The request context, for access checks.
:param project_id: The ID of the project being deleted.
:param only_quotas: Only delete limit quotas, leave other types intact.
"""
session = get_session()
with session.begin():
quotas = model_query(context, models.Quota, session=session,
read_deleted="no").\
filter_by(project_id=project_id).\
all()
for quota_ref in quotas:
quota_ref.delete(session=session)
if only_quotas:
return
quota_usages = model_query(context, models.QuotaUsage,
session=session, read_deleted="no").\
filter_by(project_id=project_id).\
all()
for quota_usage_ref in quota_usages:
quota_usage_ref.delete(session=session)
reservations = model_query(context, models.Reservation,
session=session, read_deleted="no").\
filter_by(project_id=project_id).\
all()
for reservation_ref in reservations:
reservation_ref.delete(session=session)
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def reservation_expire(context):
session = get_session()
with session.begin():
current_time = timeutils.utcnow()
results = model_query(context, models.Reservation, session=session,
read_deleted="no").\
filter(models.Reservation.expire < current_time).\
all()
if results:
for reservation in results:
if reservation.delta >= 0:
reservation.usage.reserved -= reservation.delta
reservation.usage.save(session=session)
reservation.delete(session=session)
###################
@require_admin_context
def volume_attach(context, values):
volume_attachment_ref = models.VolumeAttachment()
if not values.get('id'):
values['id'] = str(uuid.uuid4())
volume_attachment_ref.update(values)
session = get_session()
with session.begin():
volume_attachment_ref.save(session=session)
return _attachment_get(context, values['id'],
session=session)
@require_admin_context
def volume_attached(context, attachment_id, instance_uuid, host_name,
mountpoint, attach_mode, mark_attached):
"""This method updates a volume attachment entry.
This function saves the information related to a particular
attachment for a volume. It also updates the volume record
to mark the volume as attached or attaching.
The mark_attached argument is a boolean, when set to True,
we mark the volume as 'in-use' and the 'attachment' as
'attached', if False, we use 'attaching' for both of these
status settings.
"""
attach_status = fields.VolumeAttachStatus.ATTACHED
volume_status = 'in-use'
if not mark_attached:
attach_status = fields.VolumeAttachStatus.ATTACHING
volume_status = 'attaching'
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
session = get_session()
with session.begin():
volume_attachment_ref = _attachment_get(context, attachment_id,
session=session)
updated_values = {'mountpoint': mountpoint,
'attach_status': attach_status,
'instance_uuid': instance_uuid,
'attached_host': host_name,
'attach_time': timeutils.utcnow(),
'attach_mode': attach_mode,
'updated_at': volume_attachment_ref.updated_at}
volume_attachment_ref.update(updated_values)
volume_attachment_ref.save(session=session)
del updated_values['updated_at']
volume_ref = _volume_get(context, volume_attachment_ref['volume_id'],
session=session)
volume_ref['status'] = volume_status
volume_ref['attach_status'] = attach_status
volume_ref.save(session=session)
return (volume_ref, updated_values)
@handle_db_data_error
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def volume_create(context, values):
values['volume_metadata'] = _metadata_refs(values.get('metadata'),
models.VolumeMetadata)
if is_admin_context(context):
values['volume_admin_metadata'] = \
_metadata_refs(values.get('admin_metadata'),
models.VolumeAdminMetadata)
elif values.get('volume_admin_metadata'):
del values['volume_admin_metadata']
volume_ref = models.Volume()
if not values.get('id'):
values['id'] = str(uuid.uuid4())
volume_ref.update(values)
session = get_session()
with session.begin():
session.add(volume_ref)
return _volume_get(context, values['id'], session=session)
def get_booleans_for_table(table_name):
booleans = set()
table = getattr(models, table_name.capitalize())
if hasattr(table, '__table__'):
columns = table.__table__.columns
for column in columns:
if isinstance(column.type, sqltypes.Boolean):
booleans.add(column.name)
return booleans
@require_admin_context
def volume_data_get_for_host(context, host, count_only=False):
host_attr = models.Volume.host
conditions = [host_attr == host, host_attr.op('LIKE')(host + '#%')]
if count_only:
result = model_query(context,
func.count(models.Volume.id),
read_deleted="no").filter(
or_(*conditions)).first()
return result[0] or 0
else:
result = model_query(context,
func.count(models.Volume.id),
func.sum(models.Volume.size),
read_deleted="no").filter(
or_(*conditions)).first()
# NOTE(vish): convert None to 0
return (result[0] or 0, result[1] or 0)
@require_admin_context
def _volume_data_get_for_project(context, project_id, volume_type_id=None,
session=None, host=None, skip_internal=True):
model = models.Volume
query = model_query(context,
func.count(model.id),
func.sum(model.size),
read_deleted="no",
session=session).\
filter_by(project_id=project_id)
# When calling the method for quotas we don't count volumes that are the
# destination of a migration since they were not accounted for quotas or
# reservations in the first place.
# Also skip temporary volumes that have 'temporary' admin_metadata key set
# to True.
if skip_internal:
admin_model = models.VolumeAdminMetadata
query = query.filter(
and_(or_(model.migration_status.is_(None),
~model.migration_status.startswith('target:')),
~sql.exists().where(and_(model.id == admin_model.volume_id,
~admin_model.deleted,
admin_model.key == 'temporary',
admin_model.value == 'True')
)
)
)
if host:
query = query.filter(_filter_host(model.host, host))
if volume_type_id:
query = query.filter_by(volume_type_id=volume_type_id)
result = query.first()
# NOTE(vish): convert None to 0
return (result[0] or 0, result[1] or 0)
@require_admin_context
def _backup_data_get_for_project(context, project_id, volume_type_id=None,
session=None):
query = model_query(context,
func.count(models.Backup.id),
func.sum(models.Backup.size),
read_deleted="no",
session=session).\
filter_by(project_id=project_id)
if volume_type_id:
query = query.filter_by(volume_type_id=volume_type_id)
result = query.first()
# NOTE(vish): convert None to 0
return (result[0] or 0, result[1] or 0)
@require_admin_context
def volume_data_get_for_project(context, project_id, host=None):
return _volume_data_get_for_project(context, project_id, host=host,
skip_internal=False)
VOLUME_DEPENDENT_MODELS = frozenset([models.VolumeMetadata,
models.VolumeAdminMetadata,
models.Transfer,
models.VolumeGlanceMetadata,
models.VolumeAttachment])
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def volume_destroy(context, volume_id):
session = get_session()
now = timeutils.utcnow()
updated_values = {'status': 'deleted',
'deleted': True,
'deleted_at': now,
'migration_status': None}
with session.begin():
query = model_query(context, models.Volume, session=session).\
filter_by(id=volume_id)
entity = query.column_descriptions[0]['entity']
updated_values['updated_at'] = entity.updated_at
query.update(updated_values)
for model in VOLUME_DEPENDENT_MODELS:
query = model_query(context, model, session=session).\
filter_by(volume_id=volume_id)
entity = query.column_descriptions[0]['entity']
query.update({'deleted': True,
'deleted_at': now,
'updated_at': entity.updated_at})
del updated_values['updated_at']
return updated_values
def _include_in_cluster(context, cluster, model, partial_rename, filters):
"""Generic include in cluster method.
When we include resources in a cluster we have to be careful to preserve
the addressing sections that have not been provided. That's why we allow
partial_renaming, so we can preserve the backend and pool if we are only
providing host/cluster level information, and preserve pool information if
we only provide backend level information.
For example when we include a host in a cluster we receive calls with
filters like {'host': 'localhost@lvmdriver-1'} and cluster with something
like 'mycluster@lvmdriver-1'. Since in the DB the resources will have the
host field set to something like 'localhost@lvmdriver-1#lvmdriver-1' we
want to include original pool in the new cluster_name. So we want to store
in cluster_name value 'mycluster@lvmdriver-1#lvmdriver-1'.
"""
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(model, filters):
return None
query = get_session().query(model)
if hasattr(model, 'deleted'):
query = query.filter_by(deleted=False)
# cluster_name and host are special filter cases
for field in {'cluster_name', 'host'}.intersection(filters):
value = filters.pop(field)
# We do a special backend filter
query = query.filter(_filter_host(getattr(model, field), value))
# If we want to do a partial rename and we haven't set the cluster
# already, the value we want to set is a SQL replace of existing field
# value.
if partial_rename and isinstance(cluster, str):
cluster = func.replace(getattr(model, field), value, cluster)
query = query.filter_by(**filters)
result = query.update({'cluster_name': cluster}, synchronize_session=False)
return result
@require_admin_context
def volume_include_in_cluster(context, cluster, partial_rename=True,
**filters):
"""Include all volumes matching the filters into a cluster."""
return _include_in_cluster(context, cluster, models.Volume,
partial_rename, filters)
@require_admin_context
def volume_detached(context, volume_id, attachment_id):
"""This updates a volume attachment and marks it as detached.
This method also ensures that the volume entry is correctly
marked as either still attached/in-use or detached/available
if this was the last detachment made.
"""
# NOTE(jdg): This is a funky band-aid for the earlier attempts at
# multiattach, it's a bummer because these things aren't really being used
# but at the same time we don't want to break them until we work out the
# new proposal for multi-attach
remain_attachment = True
session = get_session()
with session.begin():
try:
attachment = _attachment_get(context, attachment_id,
session=session)
except exception.VolumeAttachmentNotFound:
attachment_updates = None
attachment = None
if attachment:
now = timeutils.utcnow()
attachment_updates = {
'attach_status': fields.VolumeAttachStatus.DETACHED,
'detach_time': now,
'deleted': True,
'deleted_at': now,
'updated_at': attachment.updated_at,
}
attachment.update(attachment_updates)
attachment.save(session=session)
del attachment_updates['updated_at']
attachment_list = None
volume_ref = _volume_get(context, volume_id,
session=session)
volume_updates = {'updated_at': volume_ref.updated_at}
if not volume_ref.volume_attachment:
# NOTE(jdg): We kept the old arg style allowing session exclusively
# for this one call
attachment_list = volume_attachment_get_all_by_volume_id(
context, volume_id, session=session)
remain_attachment = False
if attachment_list and len(attachment_list) > 0:
remain_attachment = True
if not remain_attachment:
# Hide status update from user if we're performing volume migration
# or uploading it to image
if ((not volume_ref.migration_status and
not (volume_ref.status == 'uploading')) or
volume_ref.migration_status in ('success', 'error')):
volume_updates['status'] = 'available'
volume_updates['attach_status'] = (
fields.VolumeAttachStatus.DETACHED)
else:
# Volume is still attached
volume_updates['status'] = 'in-use'
volume_updates['attach_status'] = (
fields.VolumeAttachStatus.ATTACHED)
volume_ref.update(volume_updates)
volume_ref.save(session=session)
del volume_updates['updated_at']
return (volume_updates, attachment_updates)
def _process_model_like_filter(model, query, filters):
"""Applies regex expression filtering to a query.
:param model: model to apply filters to
:param query: query to apply filters to
:param filters: dictionary of filters with regex values
:returns: the updated query.
"""
if query is None:
return query
for key in sorted(filters):
column_attr = getattr(model, key)
if 'property' == type(column_attr).__name__:
continue
value = filters[key]
if not (isinstance(value, (str, int))):
continue
query = query.filter(
column_attr.op('LIKE')(u'%%%s%%' % value))
return query
def apply_like_filters(model):
def decorator_filters(process_exact_filters):
def _decorator(query, filters):
exact_filters = filters.copy()
regex_filters = {}
for key, value in filters.items():
# NOTE(tommylikehu): For inexact match, the filter keys
# are in the format of 'key~=value'
if key.endswith('~'):
exact_filters.pop(key)
regex_filters[key.rstrip('~')] = value
query = process_exact_filters(query, exact_filters)
return _process_model_like_filter(model, query, regex_filters)
return _decorator
return decorator_filters
@require_context
def _volume_get_query(context, session=None, project_only=False,
joined_load=True):
"""Get the query to retrieve the volume.
:param context: the context used to run the method _volume_get_query
:param session: the session to use
:param project_only: the boolean used to decide whether to query the
volume in the current project or all projects
:param joined_load: the boolean used to decide whether the query loads
the other models, which join the volume model in
the database. Currently, the False value for this
parameter is specially for the case of updating
database during volume migration
:returns: updated query or None
"""
if not joined_load:
return model_query(context, models.Volume, session=session,
project_only=project_only)
if is_admin_context(context):
return model_query(context, models.Volume, session=session,
project_only=project_only).\
options(joinedload('volume_metadata')).\
options(joinedload('volume_admin_metadata')).\
options(joinedload('volume_type')).\
options(joinedload('volume_attachment')).\
options(joinedload('consistencygroup')).\
options(joinedload('group'))
else:
return model_query(context, models.Volume, session=session,
project_only=project_only).\
options(joinedload('volume_metadata')).\
options(joinedload('volume_type')).\
options(joinedload('volume_attachment')).\
options(joinedload('consistencygroup')).\
options(joinedload('group'))
@require_context
def _volume_get(context, volume_id, session=None, joined_load=True):
result = _volume_get_query(context, session=session, project_only=True,
joined_load=joined_load)
if joined_load:
result = result.options(joinedload('volume_type.extra_specs'))
result = result.filter_by(id=volume_id).first()
if not result:
raise exception.VolumeNotFound(volume_id=volume_id)
return result
def _attachment_get_all(context, filters=None, marker=None, limit=None,
offset=None, sort_keys=None, sort_dirs=None):
if filters and not is_valid_model_filters(models.VolumeAttachment,
filters,
exclude_list=['project_id']):
return []
session = get_session()
with session.begin():
# Generate the paginate query
query = _generate_paginate_query(context, session, marker,
limit, sort_keys, sort_dirs, filters,
offset, models.VolumeAttachment)
if query is None:
return []
return query.all()
def _attachment_get(context, attachment_id, session=None, read_deleted=False,
project_only=True):
result = (model_query(context, models.VolumeAttachment, session=session,
read_deleted=read_deleted)
.filter_by(id=attachment_id)
.options(joinedload('volume'))
.first())
if not result:
raise exception.VolumeAttachmentNotFound(filter='attachment_id = %s' %
attachment_id)
return result
def _attachment_get_query(context, session=None, project_only=False):
return model_query(context, models.VolumeAttachment, session=session,
project_only=project_only).options(joinedload('volume'))
@apply_like_filters(model=models.VolumeAttachment)
def _process_attachment_filters(query, filters):
if filters:
project_id = filters.pop('project_id', None)
# Ensure that filters' keys exist on the model
if not is_valid_model_filters(models.VolumeAttachment, filters):
return
if project_id:
volume = models.Volume
query = query.filter(volume.id ==
models.VolumeAttachment.volume_id,
volume.project_id == project_id)
query = query.filter_by(**filters)
return query
@require_admin_context
def volume_attachment_get_all(context, filters=None, marker=None, limit=None,
offset=None, sort_keys=None, sort_dirs=None):
"""Retrieve all Attachment records with filter and pagination options."""
return _attachment_get_all(context, filters, marker, limit, offset,
sort_keys, sort_dirs)
@require_context
def volume_attachment_get_all_by_volume_id(context, volume_id, session=None):
result = model_query(context, models.VolumeAttachment,
session=session).\
filter_by(volume_id=volume_id).\
filter(models.VolumeAttachment.attach_status !=
fields.VolumeAttachStatus.DETACHED). \
options(joinedload('volume')).\
all()
return result
@require_context
def volume_attachment_get_all_by_host(context, host):
session = get_session()
with session.begin():
result = model_query(context, models.VolumeAttachment,
session=session).\
filter_by(attached_host=host).\
filter(models.VolumeAttachment.attach_status !=
fields.VolumeAttachStatus.DETACHED). \
options(joinedload('volume')).\
all()
return result
@require_context
def volume_attachment_get(context, attachment_id):
"""Fetch the specified attachment record."""
return _attachment_get(context, attachment_id)
@require_context
def volume_attachment_get_all_by_instance_uuid(context,
instance_uuid):
"""Fetch all attachment records associated with the specified instance."""
session = get_session()
with session.begin():
result = model_query(context, models.VolumeAttachment,
session=session).\
filter_by(instance_uuid=instance_uuid).\
filter(models.VolumeAttachment.attach_status !=
fields.VolumeAttachStatus.DETACHED).\
options(joinedload('volume')).\
all()
return result
@require_context
def volume_attachment_get_all_by_project(context, project_id, filters=None,
marker=None, limit=None, offset=None,
sort_keys=None, sort_dirs=None):
"""Retrieve all Attachment records for specific project."""
authorize_project_context(context, project_id)
if not filters:
filters = {}
else:
filters = filters.copy()
filters['project_id'] = project_id
return _attachment_get_all(context, filters, marker,
limit, offset, sort_keys,
sort_dirs)
@require_admin_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def attachment_destroy(context, attachment_id):
"""Destroy the specified attachment record."""
utcnow = timeutils.utcnow()
session = get_session()
with session.begin():
query = model_query(context, models.VolumeAttachment,
session=session).filter_by(id=attachment_id)
entity = query.column_descriptions[0]['entity']
updated_values = {'attach_status': fields.VolumeAttachStatus.DELETED,
'deleted': True,
'deleted_at': utcnow,
'updated_at': entity.updated_at}
query.update(updated_values)
query = model_query(context, models.AttachmentSpecs, session=session).\
filter_by(attachment_id=attachment_id)
entity = query.column_descriptions[0]['entity']
query.update({'deleted': True,
'deleted_at': utcnow,
'updated_at': entity.updated_at})
del updated_values['updated_at']
return updated_values
def attachment_specs_exist(context):
query = model_query(context, models.AttachmentSpecs, read_deleted='no')
return bool(query.first())
def _attachment_specs_query(context, attachment_id, session=None):
return model_query(context, models.AttachmentSpecs, session=session,
read_deleted="no").\
filter_by(attachment_id=attachment_id)
@require_context
def attachment_specs_get(context, attachment_id):
"""DEPRECATED: Fetch the attachment_specs for the specified attachment."""
rows = _attachment_specs_query(context, attachment_id).\
all()
result = {row['key']: row['value'] for row in rows}
return result
@require_context
def attachment_specs_delete(context, attachment_id, key):
"""DEPRECATED: Delete attachment_specs for the specified attachment."""
session = get_session()
with session.begin():
_attachment_specs_get_item(context,
attachment_id,
key,
session)
query = _attachment_specs_query(context, attachment_id, session).\
filter_by(key=key)
entity = query.column_descriptions[0]['entity']
query.update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': entity.updated_at})
@require_context
def _attachment_specs_get_item(context,
attachment_id,
key,
session=None):
result = _attachment_specs_query(
context, attachment_id, session=session).\
filter_by(key=key).\
first()
if not result:
raise exception.AttachmentSpecsNotFound(
specs_key=key,
attachment_id=attachment_id)
return result
@handle_db_data_error
@require_context
def attachment_specs_update_or_create(context,
attachment_id,
specs):
"""DEPRECATED: Update attachment_specs for the specified attachment."""
session = get_session()
with session.begin():
spec_ref = None
for key, value in specs.items():
try:
spec_ref = _attachment_specs_get_item(
context, attachment_id, key, session)
except exception.AttachmentSpecsNotFound:
spec_ref = models.AttachmentSpecs()
spec_ref.update({"key": key, "value": value,
"attachment_id": attachment_id,
"deleted": False})
spec_ref.save(session=session)
return specs
@require_context
def volume_get(context, volume_id):
return _volume_get(context, volume_id)
@require_admin_context
def volume_get_all(context, marker=None, limit=None, sort_keys=None,
sort_dirs=None, filters=None, offset=None):
"""Retrieves all volumes.
If no sort parameters are specified then the returned volumes are sorted
first by the 'created_at' key and then by the 'id' key in descending
order.
:param context: context to query under
:param marker: the last item of the previous page, used to determine the
next page of results to return
:param limit: maximum number of items to return
:param sort_keys: list of attributes by which results should be sorted,
paired with corresponding item in sort_dirs
:param sort_dirs: list of directions in which results should be sorted,
paired with corresponding item in sort_keys
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: list of matching volumes
"""
session = get_session()
with session.begin():
# Generate the query
query = _generate_paginate_query(context, session, marker, limit,
sort_keys, sort_dirs, filters, offset)
# No volumes would match, return empty list
if query is None:
return []
return query.all()
@require_context
def get_volume_summary(context, project_only, filters=None):
"""Retrieves all volumes summary.
:param context: context to query under
:param project_only: limit summary to project volumes
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: volume summary
"""
if not (project_only or is_admin_context(context)):
raise exception.AdminRequired()
query = model_query(context, func.count(models.Volume.id),
func.sum(models.Volume.size), read_deleted="no")
if project_only:
query = query.filter_by(project_id=context.project_id)
if filters:
query = _process_volume_filters(query, filters)
if query is None:
return []
result = query.first()
query_metadata = model_query(
context, models.VolumeMetadata.key, models.VolumeMetadata.value,
read_deleted="no")
if project_only:
query_metadata = query_metadata.join(
models.Volume,
models.Volume.id == models.VolumeMetadata.volume_id).filter_by(
project_id=context.project_id)
result_metadata = query_metadata.distinct().all()
result_metadata_list = collections.defaultdict(list)
for key, value in result_metadata:
result_metadata_list[key].append(value)
return (result[0] or 0, result[1] or 0, result_metadata_list)
@require_admin_context
def volume_get_all_by_host(context, host, filters=None):
"""Retrieves all volumes hosted on a host.
:param context: context to query under
:param host: host for all volumes being retrieved
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: list of matching volumes
"""
# As a side effect of the introduction of pool-aware scheduler,
# newly created volumes will have pool information appended to
# 'host' field of a volume record. So a volume record in DB can
# now be either form below:
# Host
# Host#Pool
if host and isinstance(host, str):
session = get_session()
with session.begin():
host_attr = getattr(models.Volume, 'host')
conditions = [host_attr == host,
host_attr.op('LIKE')(host + '#%')]
query = _volume_get_query(context).filter(or_(*conditions))
if filters:
query = _process_volume_filters(query, filters)
# No volumes would match, return empty list
if query is None:
return []
return query.all()
elif not host:
return []
@require_context
def volume_get_all_by_group(context, group_id, filters=None):
"""Retrieves all volumes associated with the group_id.
:param context: context to query under
:param group_id: consistency group ID for all volumes being retrieved
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: list of matching volumes
"""
query = _volume_get_query(context).filter_by(consistencygroup_id=group_id)
if filters:
query = _process_volume_filters(query, filters)
# No volumes would match, return empty list
if query is None:
return []
return query.all()
@require_context
def volume_get_all_by_generic_group(context, group_id, filters=None):
"""Retrieves all volumes associated with the group_id.
:param context: context to query under
:param group_id: group ID for all volumes being retrieved
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: list of matching volumes
"""
query = _volume_get_query(context).filter_by(group_id=group_id)
if filters:
query = _process_volume_filters(query, filters)
# No volumes would match, return empty list
if query is None:
return []
return query.all()
@require_context
def volume_get_all_by_project(context, project_id, marker, limit,
sort_keys=None, sort_dirs=None, filters=None,
offset=None):
"""Retrieves all volumes in a project.
If no sort parameters are specified then the returned volumes are sorted
first by the 'created_at' key and then by the 'id' key in descending
order.
:param context: context to query under
:param project_id: project for all volumes being retrieved
:param marker: the last item of the previous page, used to determine the
next page of results to return
:param limit: maximum number of items to return
:param sort_keys: list of attributes by which results should be sorted,
paired with corresponding item in sort_dirs
:param sort_dirs: list of directions in which results should be sorted,
paired with corresponding item in sort_keys
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:returns: list of matching volumes
"""
session = get_session()
with session.begin():
authorize_project_context(context, project_id)
# Add in the project filter without modifying the given filters
filters = filters.copy() if filters else {}
filters['project_id'] = project_id
# Generate the query
query = _generate_paginate_query(context, session, marker, limit,
sort_keys, sort_dirs, filters, offset)
# No volumes would match, return empty list
if query is None:
return []
return query.all()
def _generate_paginate_query(context, session, marker, limit, sort_keys,
sort_dirs, filters, offset=None,
paginate_type=models.Volume):
"""Generate the query to include the filters and the paginate options.
Returns a query with sorting / pagination criteria added or None
if the given filters will not yield any results.
:param context: context to query under
:param session: the session to use
:param marker: the last item of the previous page; we returns the next
results after this value.
:param limit: maximum number of items to return
:param sort_keys: list of attributes by which results should be sorted,
paired with corresponding item in sort_dirs
:param sort_dirs: list of directions in which results should be sorted,
paired with corresponding item in sort_keys
:param filters: dictionary of filters; values that are in lists, tuples,
or sets cause an 'IN' operation, while exact matching
is used for other values, see _process_volume_filters
function for more information
:param offset: number of items to skip
:param paginate_type: type of pagination to generate
:returns: updated query or None
"""
get_query, process_filters, get = PAGINATION_HELPERS[paginate_type]
sort_keys, sort_dirs = process_sort_params(sort_keys,
sort_dirs,
default_dir='desc')
query = get_query(context, session=session)
if filters:
query = process_filters(query, filters)
if query is None:
return None
marker_object = None
if marker is not None:
marker_object = get(context, marker, session)
return sqlalchemyutils.paginate_query(query, paginate_type, limit,
sort_keys,
marker=marker_object,
sort_dirs=sort_dirs,
offset=offset)
def calculate_resource_count(context, resource_type, filters):
"""Calculate total count with filters applied"""
session = get_session()
if resource_type not in CALCULATE_COUNT_HELPERS.keys():
raise exception.InvalidInput(
reason=_("Model %s doesn't support "
"counting resource.") % resource_type)
get_query, process_filters = CALCULATE_COUNT_HELPERS[resource_type]
query = get_query(context, session=session)
if filters:
query = process_filters(query, filters)
if query is None:
return 0
return query.with_entities(func.count()).scalar()
@apply_like_filters(model=models.Volume)
def _process_volume_filters(query, filters):
"""Common filter processing for Volume queries.
Filter values that are in lists, tuples, or sets cause an 'IN' operator
to be used, while exact matching ('==' operator) is used for other values.
A filter key/value of 'no_migration_targets'=True causes volumes with
either a NULL 'migration_status' or a 'migration_status' that does not
start with 'target:' to be retrieved.
A 'metadata' filter key must correspond to a dictionary value of metadata
key-value pairs.
:param query: Model query to use
:param filters: dictionary of filters
:returns: updated query or None
"""
filters = filters.copy()
# 'no_migration_targets' is unique, must be either NULL or
# not start with 'target:'
if filters.get('no_migration_targets', False):
filters.pop('no_migration_targets')
try:
column_attr = getattr(models.Volume, 'migration_status')
conditions = [column_attr == None, # noqa
column_attr.op('NOT LIKE')('target:%')]
query = query.filter(or_(*conditions))
except AttributeError:
LOG.debug("'migration_status' column could not be found.")
return None
host = filters.pop('host', None)
if host:
query = query.filter(_filter_host(models.Volume.host, host))
cluster_name = filters.pop('cluster_name', None)
if cluster_name:
query = query.filter(_filter_host(models.Volume.cluster_name,
cluster_name))
for time_comparison_filter in ['created_at', 'updated_at']:
if filters.get(time_comparison_filter, None):
time_filter_dict = filters.pop(time_comparison_filter)
try:
time_filter_attr = getattr(models.Volume,
time_comparison_filter)
query = query.filter(_filter_time_comparison(time_filter_attr,
time_filter_dict))
except AttributeError:
LOG.debug("%s column could not be found.",
time_comparison_filter)
return None
# Apply exact match filters for everything else, ensure that the
# filter value exists on the model
for key in filters.keys():
# metadata/glance_metadata is unique, must be a dict
if key in ('metadata', 'glance_metadata'):
if not isinstance(filters[key], dict):
LOG.debug("'%s' filter value is not valid.", key)
return None
continue
try:
column_attr = getattr(models.Volume, key)
# Do not allow relationship properties since those require
# schema specific knowledge
prop = getattr(column_attr, 'property')
if isinstance(prop, RelationshipProperty):
LOG.debug(("'%s' filter key is not valid, "
"it maps to a relationship."), key)
return None
except AttributeError:
LOG.debug("'%s' filter key is not valid.", key)
return None
# Holds the simple exact matches
filter_dict = {}
# Iterate over all filters, special case the filter if necessary
for key, value in filters.items():
if key == 'metadata':
# model.VolumeMetadata defines the backref to Volumes as
# 'volume_metadata' or 'volume_admin_metadata', use those as
# column attribute keys
col_attr = getattr(models.Volume, 'volume_metadata')
col_ad_attr = getattr(models.Volume, 'volume_admin_metadata')
for k, v in value.items():
query = query.filter(or_(col_attr.any(key=k, value=v),
col_ad_attr.any(key=k, value=v)))
elif key == 'glance_metadata':
# use models.Volume.volume_glance_metadata as column attribute key.
col_gl_attr = models.Volume.volume_glance_metadata
for k, v in value.items():
query = query.filter(col_gl_attr.any(key=k, value=v))
elif isinstance(value, (list, tuple, set, frozenset)):
# Looking for values in a list; apply to query directly
column_attr = getattr(models.Volume, key)
query = query.filter(column_attr.in_(value))
else:
# OK, simple exact match; save for later
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter_by(**filter_dict)
return query
def process_sort_params(sort_keys, sort_dirs, default_keys=None,
default_dir='asc'):
"""Process the sort parameters to include default keys.
Creates a list of sort keys and a list of sort directions. Adds the default
keys to the end of the list if they are not already included.
When adding the default keys to the sort keys list, the associated
direction is:
1) The first element in the 'sort_dirs' list (if specified), else
2) 'default_dir' value (Note that 'asc' is the default value since this is
the default in sqlalchemy.utils.paginate_query)
:param sort_keys: List of sort keys to include in the processed list
:param sort_dirs: List of sort directions to include in the processed list
:param default_keys: List of sort keys that need to be included in the
processed list, they are added at the end of the list
if not already specified.
:param default_dir: Sort direction associated with each of the default
keys that are not supplied, used when they are added
to the processed list
:returns: list of sort keys, list of sort directions
:raise exception.InvalidInput: If more sort directions than sort keys
are specified or if an invalid sort
direction is specified
"""
if default_keys is None:
default_keys = ['created_at'