magnum/magnum/db/sqlalchemy/api.py

614 lines
23 KiB
Python

# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""SQLAlchemy storage backend."""
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import func
from magnum.common import exception
import magnum.conf
from magnum.db import api
from magnum.db.sqlalchemy import models
from magnum.i18n import _
CONF = magnum.conf.CONF
_FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade.from_config(CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend():
"""The backend is this module itself."""
return Connection()
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
:param session: if present, the session to use
"""
session = kwargs.get('session') or get_session()
query = session.query(model, *args)
return query
def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if strutils.is_int_like(value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(uuid=value)
else:
raise exception.InvalidIdentity(identity=value)
def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None):
if not query:
query = model_query(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key)
try:
query = db_utils.paginate_query(query, model, limit, sort_keys,
marker=marker, sort_dir=sort_dir)
except db_exc.InvalidSortKey:
raise exception.InvalidParameterValue(
_('The sort_key value "%(key)s" is an invalid field for sorting')
% {'key': sort_key})
return query.all()
class Connection(api.Connection):
"""SqlAlchemy connection."""
def __init__(self):
pass
def _add_tenant_filters(self, context, query):
if context.is_admin and context.all_tenants:
return query
if context.project_id:
query = query.filter_by(project_id=context.project_id)
else:
query = query.filter_by(user_id=context.user_id)
return query
def _add_clusters_filters(self, query, filters):
if filters is None:
filters = {}
possible_filters = ["cluster_template_id", "name", "node_count",
"master_count", "stack_id", "api_address",
"node_addresses", "project_id", "user_id"]
filter_names = set(filters).intersection(possible_filters)
filter_dict = {filter_name: filters[filter_name]
for filter_name in filter_names}
query = query.filter_by(**filter_dict)
if 'status' in filters:
query = query.filter(models.Cluster.status.in_(filters['status']))
return query
def get_cluster_list(self, context, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.Cluster)
query = self._add_tenant_filters(context, query)
query = self._add_clusters_filters(query, filters)
return _paginate_query(models.Cluster, limit, marker,
sort_key, sort_dir, query)
def create_cluster(self, values):
# ensure defaults are present for new clusters
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
cluster = models.Cluster()
cluster.update(values)
try:
cluster.save()
except db_exc.DBDuplicateEntry:
raise exception.ClusterAlreadyExists(uuid=values['uuid'])
return cluster
def get_cluster_by_id(self, context, cluster_id):
query = model_query(models.Cluster)
query = self._add_tenant_filters(context, query)
query = query.filter_by(id=cluster_id)
try:
return query.one()
except NoResultFound:
raise exception.ClusterNotFound(cluster=cluster_id)
def get_cluster_by_name(self, context, cluster_name):
query = model_query(models.Cluster)
query = self._add_tenant_filters(context, query)
query = query.filter_by(name=cluster_name)
try:
return query.one()
except MultipleResultsFound:
raise exception.Conflict('Multiple clusters exist with same name.'
' Please use the cluster uuid instead.')
except NoResultFound:
raise exception.ClusterNotFound(cluster=cluster_name)
def get_cluster_by_uuid(self, context, cluster_uuid):
query = model_query(models.Cluster)
query = self._add_tenant_filters(context, query)
query = query.filter_by(uuid=cluster_uuid)
try:
return query.one()
except NoResultFound:
raise exception.ClusterNotFound(cluster=cluster_uuid)
def get_cluster_stats(self, context, project_id=None):
query = model_query(models.Cluster)
node_count_col = models.Cluster.node_count
master_count_col = models.Cluster.master_count
ncfunc = func.sum(node_count_col + master_count_col)
if project_id:
query = query.filter_by(project_id=project_id)
nquery = query.session.query(ncfunc.label("nodes")).filter_by(
project_id=project_id)
else:
nquery = query.session.query(ncfunc.label("nodes"))
clusters = query.count()
nodes = int(nquery.one()[0]) if nquery.one()[0] else 0
return clusters, nodes
def get_cluster_count_all(self, context, filters=None):
query = model_query(models.Cluster)
query = self._add_tenant_filters(context, query)
query = self._add_clusters_filters(query, filters)
return query.count()
def destroy_cluster(self, cluster_id):
session = get_session()
with session.begin():
query = model_query(models.Cluster, session=session)
query = add_identity_filter(query, cluster_id)
try:
query.one()
except NoResultFound:
raise exception.ClusterNotFound(cluster=cluster_id)
query.delete()
def update_cluster(self, cluster_id, values):
# NOTE(dtantsur): this can lead to very strange errors
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing Cluster.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_cluster(cluster_id, values)
def _do_update_cluster(self, cluster_id, values):
session = get_session()
with session.begin():
query = model_query(models.Cluster, session=session)
query = add_identity_filter(query, cluster_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.ClusterNotFound(cluster=cluster_id)
ref.update(values)
return ref
def _add_cluster_template_filters(self, query, filters):
if filters is None:
filters = {}
possible_filters = ["name", "image_id", "flavor_id",
"master_flavor_id", "keypair_id",
"external_network_id", "dns_nameserver",
"project_id", "user_id", "labels"]
filter_names = set(filters).intersection(possible_filters)
filter_dict = {filter_name: filters[filter_name]
for filter_name in filter_names}
return query.filter_by(**filter_dict)
def get_cluster_template_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
query = model_query(models.ClusterTemplate)
query = self._add_tenant_filters(context, query)
query = self._add_cluster_template_filters(query, filters)
# include public ClusterTemplates
public_q = model_query(models.ClusterTemplate).filter_by(public=True)
query = query.union(public_q)
return _paginate_query(models.ClusterTemplate, limit, marker,
sort_key, sort_dir, query)
def create_cluster_template(self, values):
# ensure defaults are present for new ClusterTemplates
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
cluster_template = models.ClusterTemplate()
cluster_template.update(values)
try:
cluster_template.save()
except db_exc.DBDuplicateEntry:
raise exception.ClusterTemplateAlreadyExists(uuid=values['uuid'])
return cluster_template
def get_cluster_template_by_id(self, context, cluster_template_id):
query = model_query(models.ClusterTemplate)
query = self._add_tenant_filters(context, query)
public_q = model_query(models.ClusterTemplate).filter_by(public=True)
query = query.union(public_q)
query = query.filter_by(id=cluster_template_id)
try:
return query.one()
except NoResultFound:
raise exception.ClusterTemplateNotFound(
clustertemplate=cluster_template_id)
def get_cluster_template_by_uuid(self, context, cluster_template_uuid):
query = model_query(models.ClusterTemplate)
query = self._add_tenant_filters(context, query)
public_q = model_query(models.ClusterTemplate).filter_by(public=True)
query = query.union(public_q)
query = query.filter_by(uuid=cluster_template_uuid)
try:
return query.one()
except NoResultFound:
raise exception.ClusterTemplateNotFound(
clustertemplate=cluster_template_uuid)
def get_cluster_template_by_name(self, context, cluster_template_name):
query = model_query(models.ClusterTemplate)
query = self._add_tenant_filters(context, query)
public_q = model_query(models.ClusterTemplate).filter_by(public=True)
query = query.union(public_q)
query = query.filter_by(name=cluster_template_name)
try:
return query.one()
except MultipleResultsFound:
raise exception.Conflict('Multiple ClusterTemplates exist with'
' same name. Please use the '
'ClusterTemplate uuid instead.')
except NoResultFound:
raise exception.ClusterTemplateNotFound(
clustertemplate=cluster_template_name)
def _is_cluster_template_referenced(self, session, cluster_template_uuid):
"""Checks whether the ClusterTemplate is referenced by cluster(s)."""
query = model_query(models.Cluster, session=session)
query = self._add_clusters_filters(query, {'cluster_template_id':
cluster_template_uuid})
return query.count() != 0
def _is_publishing_cluster_template(self, values):
if (len(values) == 1 and
'public' in values and values['public'] is True):
return True
return False
def destroy_cluster_template(self, cluster_template_id):
session = get_session()
with session.begin():
query = model_query(models.ClusterTemplate, session=session)
query = add_identity_filter(query, cluster_template_id)
try:
cluster_template_ref = query.one()
except NoResultFound:
raise exception.ClusterTemplateNotFound(
clustertemplate=cluster_template_id)
if self._is_cluster_template_referenced(
session, cluster_template_ref['uuid']):
raise exception.ClusterTemplateReferenced(
clustertemplate=cluster_template_id)
query.delete()
def update_cluster_template(self, cluster_template_id, values):
# NOTE(dtantsur): this can lead to very strange errors
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing ClusterTemplate.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_cluster_template(cluster_template_id, values)
def _do_update_cluster_template(self, cluster_template_id, values):
session = get_session()
with session.begin():
query = model_query(models.ClusterTemplate, session=session)
query = add_identity_filter(query, cluster_template_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.ClusterTemplateNotFound(
clustertemplate=cluster_template_id)
if self._is_cluster_template_referenced(session, ref['uuid']):
# we only allow to update ClusterTemplate to be public
if not self._is_publishing_cluster_template(values):
raise exception.ClusterTemplateReferenced(
clustertemplate=cluster_template_id)
ref.update(values)
return ref
def create_x509keypair(self, values):
# ensure defaults are present for new x509keypairs
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
x509keypair = models.X509KeyPair()
x509keypair.update(values)
try:
x509keypair.save()
except db_exc.DBDuplicateEntry:
raise exception.X509KeyPairAlreadyExists(uuid=values['uuid'])
return x509keypair
def get_x509keypair_by_id(self, context, x509keypair_id):
query = model_query(models.X509KeyPair)
query = self._add_tenant_filters(context, query)
query = query.filter_by(id=x509keypair_id)
try:
return query.one()
except NoResultFound:
raise exception.X509KeyPairNotFound(x509keypair=x509keypair_id)
def get_x509keypair_by_uuid(self, context, x509keypair_uuid):
query = model_query(models.X509KeyPair)
query = self._add_tenant_filters(context, query)
query = query.filter_by(uuid=x509keypair_uuid)
try:
return query.one()
except NoResultFound:
raise exception.X509KeyPairNotFound(x509keypair=x509keypair_uuid)
def destroy_x509keypair(self, x509keypair_id):
session = get_session()
with session.begin():
query = model_query(models.X509KeyPair, session=session)
query = add_identity_filter(query, x509keypair_id)
count = query.delete()
if count != 1:
raise exception.X509KeyPairNotFound(x509keypair_id)
def update_x509keypair(self, x509keypair_id, values):
# NOTE(dtantsur): this can lead to very strange errors
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing X509KeyPair.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_x509keypair(x509keypair_id, values)
def _do_update_x509keypair(self, x509keypair_id, values):
session = get_session()
with session.begin():
query = model_query(models.X509KeyPair, session=session)
query = add_identity_filter(query, x509keypair_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.X509KeyPairNotFound(x509keypair=x509keypair_id)
ref.update(values)
return ref
def _add_x509keypairs_filters(self, query, filters):
if filters is None:
filters = {}
if 'project_id' in filters:
query = query.filter_by(project_id=filters['project_id'])
if 'user_id' in filters:
query = query.filter_by(user_id=filters['user_id'])
return query
def get_x509keypair_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
query = model_query(models.X509KeyPair)
query = self._add_tenant_filters(context, query)
query = self._add_x509keypairs_filters(query, filters)
return _paginate_query(models.X509KeyPair, limit, marker,
sort_key, sort_dir, query)
def destroy_magnum_service(self, magnum_service_id):
session = get_session()
with session.begin():
query = model_query(models.MagnumService, session=session)
query = add_identity_filter(query, magnum_service_id)
count = query.delete()
if count != 1:
raise exception.MagnumServiceNotFound(
magnum_service_id=magnum_service_id)
def update_magnum_service(self, magnum_service_id, values):
session = get_session()
with session.begin():
query = model_query(models.MagnumService, session=session)
query = add_identity_filter(query, magnum_service_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.MagnumServiceNotFound(
magnum_service_id=magnum_service_id)
if 'report_count' in values:
if values['report_count'] > ref.report_count:
ref.last_seen_up = timeutils.utcnow()
ref.update(values)
return ref
def get_magnum_service_by_host_and_binary(self, host, binary):
query = model_query(models.MagnumService)
query = query.filter_by(host=host, binary=binary)
try:
return query.one()
except NoResultFound:
return None
def create_magnum_service(self, values):
magnum_service = models.MagnumService()
magnum_service.update(values)
try:
magnum_service.save()
except db_exc.DBDuplicateEntry:
raise exception.MagnumServiceAlreadyExists(id=magnum_service['id'])
return magnum_service
def get_magnum_service_list(self, disabled=None, limit=None,
marker=None, sort_key=None, sort_dir=None
):
query = model_query(models.MagnumService)
if disabled:
query = query.filter_by(disabled=disabled)
return _paginate_query(models.MagnumService, limit, marker,
sort_key, sort_dir, query)
def create_quota(self, values):
quotas = models.Quota()
quotas.update(values)
try:
quotas.save()
except db_exc.DBDuplicateEntry:
raise exception.QuotaAlreadyExists(project_id=values['project_id'],
resource=values['resource'])
return quotas
def _add_quota_filters(self, query, filters):
if filters is None:
filters = {}
possible_filters = ["resource", "project_id"]
filter_names = set(filters).intersection(possible_filters)
filter_dict = {filter_name: filters[filter_name]
for filter_name in filter_names}
query = query.filter_by(**filter_dict)
return query
def get_quota_list(self, context, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.Quota)
query = self._add_quota_filters(query, filters)
return _paginate_query(models.Quota, limit, marker,
sort_key, sort_dir, query)
def update_quota(self, project_id, values):
session = get_session()
with session.begin():
query = model_query(models.Quota, session=session)
resource = values['resource']
try:
query = query.filter_by(project_id=project_id).filter_by(
resource=resource)
ref = query.with_lockmode('update').one()
except NoResultFound:
msg = (_('project_id %(project_id)s resource %(resource)s.') %
{'project_id': project_id, 'resource': resource})
raise exception.QuotaNotFound(msg=msg)
ref.update(values)
return ref
def delete_quota(self, project_id, resource):
session = get_session()
with session.begin():
query = model_query(models.Quota, session=session)
try:
query.filter_by(project_id=project_id).filter_by(
resource=resource).one()
except NoResultFound:
msg = (_('project_id %(project_id)s resource %(resource)s.') %
{'project_id': project_id, 'resource': resource})
raise exception.QuotaNotFound(msg=msg)
query.delete()
def get_quota_by_id(self, context, quota_id):
query = model_query(models.Quota)
query = query.filter_by(id=quota_id)
try:
return query.one()
except NoResultFound:
msg = _('quota id %s .') % quota_id
raise exception.QuotaNotFound(msg=msg)
def quota_get_all_by_project_id(self, project_id):
query = model_query(models.Quota)
result = query.filter_by(project_id=project_id).all()
return result
def get_quota_by_project_id_resource(self, project_id, resource):
query = model_query(models.Quota)
query = query.filter_by(project_id=project_id).filter_by(
resource=resource)
try:
return query.one()
except NoResultFound:
msg = (_('project_id %(project_id)s resource %(resource)s.') %
{'project_id': project_id, 'resource': resource})
raise exception.QuotaNotFound(msg=msg)