a6a6b84258
This change adds the capability to rename the subcloud after bootstrap or during subcloud rehome operation. Added a field in the database to separate the region name from the subcloud name. The region name determines the subcloud reference in the Openstack core, through which it is possible to access the endpoints of a given subcloud. Since the region name cannot be changed, this commit adds the ability to maintain a unique region name based on the UUID format, and allows subcloud renaming when necessary without any endpoint impact. The region is randomly generated to configure the subcloud when it is created and only applies to future subclouds. For those systems that have existing subclouds, the region will be the same as on day 0, that is, region will keep the same name as the subcloud, but subclouds can be renamed. This topic involves changes to dcmanager, dcmanager-client and GUI. To ensure the region name reference needed by the cert-monitor, a mechanism to determine if the request is coming from the cert-monitor has been created. Usage for subcloud rename: dcmanager subcloud update <subcloud-name> --name <new-name> Usage for subcloud rehoming: dcmanager subcloud add --name <subcloud-name> --migrate ... Note: Upgrade test from StarlingX 8 -> 9 for this commit is deferred until upgrade functionality in master is restored. Any issue found during upgrade test will be addressed in a separate commit Test Plan: PASS: Run dcmanager subcloud passing subcommands: - add/delete/migrate/list/show/show --detail - errors/manage/unmanage/reinstall/reconfig - update/deploy PASS: Run dcmanager subcloud add supplying --name parameter and validate the operation is not allowed PASS: Run dcmanager supplying subcommands: - kube/patch/prestage strategies PASS: Run dcmanager to apply patch and remove it PASS: Run dcmanager subcloud-backup: - create/delete/restore/show/upload PASS: Run subcloud-group: - add/delete/list/list-subclouds/show/update PASS: Run dcmanager subcloud strategy for: - patch/kubernetes/firmware PASS: Run dcmanager subcloud update command passing --name parameter supplying the following values: - current subcloud name (not changed) - different existing subcloud name PASS: Run dcmanager to migrate a subcloud passing --name parameter supplying a new subcloud name PASS: Run dcmanager to migrate a subcloud without --name parameter PASS: Run dcmanager to migrate a subcloud passing --name parameter supplying a new subcloud name and different subcloud name in bootstrap file PASS: Test dcmanager API response using cURL command line to validate new region name field PASS: Run full DC sanity and regression Story: 2010788 Task: 48217 Signed-off-by: Cristian Mondo <cristian.mondo@windriver.com> Change-Id: Id04f42504b8e325d9ec3880c240fe4a06e3a20b7
1092 lines
35 KiB
Python
1092 lines
35 KiB
Python
# Copyright (c) 2015 Ericsson AB.
|
|
# Copyright (c) 2017-2021 Wind River Systems, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
|
|
"""
|
|
Implementation of SQLAlchemy backend.
|
|
"""
|
|
|
|
import datetime
|
|
import sys
|
|
import threading
|
|
|
|
from oslo_db import api as oslo_db_api
|
|
from oslo_db import exception as db_exc
|
|
from oslo_db.sqlalchemy import enginefacade
|
|
|
|
from oslo_log import log as logging
|
|
from oslo_utils import strutils
|
|
from oslo_utils import timeutils
|
|
from oslo_utils import uuidutils
|
|
|
|
from sqlalchemy import asc
|
|
from sqlalchemy import desc
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm.exc import MultipleResultsFound
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
from sqlalchemy.orm import joinedload_all
|
|
|
|
from dcorch.common import consts
|
|
from dcorch.common import exceptions as exception
|
|
from dcorch.common.i18n import _
|
|
from dcorch.db.sqlalchemy import migration
|
|
from dcorch.db.sqlalchemy import models
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
_facade = None
|
|
|
|
_main_context_manager = None
|
|
_CONTEXT = threading.local()
|
|
|
|
|
|
def _get_main_context_manager():
|
|
global _main_context_manager
|
|
if not _main_context_manager:
|
|
_main_context_manager = enginefacade.transaction_context()
|
|
return _main_context_manager
|
|
|
|
|
|
def get_engine():
|
|
return _get_main_context_manager().get_legacy_facade().get_engine()
|
|
|
|
|
|
def get_session():
|
|
return _get_main_context_manager().get_legacy_facade().get_session()
|
|
|
|
|
|
def read_session():
|
|
return _get_main_context_manager().reader.using(_CONTEXT)
|
|
|
|
|
|
def write_session():
|
|
return _get_main_context_manager().writer.using(_CONTEXT)
|
|
|
|
|
|
_DEFAULT_QUOTA_NAME = 'default'
|
|
|
|
|
|
def get_backend():
|
|
"""The backend is this module itself."""
|
|
return sys.modules[__name__]
|
|
|
|
|
|
def model_query(context, *args, **kwargs):
|
|
session = kwargs.get('session')
|
|
if session:
|
|
return session.query(*args).options(joinedload_all('*'))
|
|
else:
|
|
with read_session() as session:
|
|
return session.query(*args).options(joinedload_all('*'))
|
|
|
|
|
|
def _session(context):
|
|
return get_session()
|
|
|
|
|
|
def is_admin_context(context):
|
|
"""Indicate if the request context is an administrator."""
|
|
if not context:
|
|
LOG.warning(_('Use of empty request context is deprecated'),
|
|
DeprecationWarning)
|
|
raise Exception('die')
|
|
return context.is_admin
|
|
|
|
|
|
def is_user_context(context):
|
|
"""Indicate if the request context is a normal user."""
|
|
if not context:
|
|
return False
|
|
if context.is_admin:
|
|
return False
|
|
if not context.user or not context.project:
|
|
return False
|
|
return True
|
|
|
|
|
|
def require_admin_context(f):
|
|
"""Decorator to require admin request context.
|
|
|
|
The first argument to the wrapped function must be the context.
|
|
"""
|
|
def wrapper(*args, **kwargs):
|
|
if not is_admin_context(args[0]):
|
|
raise exception.AdminRequired()
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def require_context(f):
|
|
"""Decorator to require *any* user or admin context.
|
|
|
|
This does no authorization for user or project access matching, see
|
|
:py:func:`authorize_project_context` and
|
|
:py:func:`authorize_user_context`.
|
|
The first argument to the wrapped function must be the context.
|
|
|
|
"""
|
|
def wrapper(*args, **kwargs):
|
|
if not is_admin_context(args[0]) and not is_user_context(args[0]):
|
|
raise exception.NotAuthorized()
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
###################
|
|
|
|
|
|
@require_context
|
|
def _quota_get(context, project_id, resource, session=None):
|
|
result = model_query(context, models.Quota). \
|
|
filter_by(project_id=project_id). \
|
|
filter_by(resource=resource). \
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.ProjectQuotaNotFound(project_id=project_id)
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
def quota_get(context, project_id, resource):
|
|
return _quota_get(context, project_id, resource)
|
|
|
|
|
|
@require_context
|
|
def quota_get_all_by_project(context, project_id):
|
|
rows = model_query(context, models.Quota). \
|
|
filter_by(project_id=project_id). \
|
|
all()
|
|
result = {'project_id': project_id}
|
|
for row in rows:
|
|
result[row.resource] = row.hard_limit
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def quota_create(context, project_id, resource, limit):
|
|
with write_session() as session:
|
|
quota_ref = models.Quota()
|
|
quota_ref.project_id = project_id
|
|
quota_ref.resource = resource
|
|
quota_ref.hard_limit = limit
|
|
session.add(quota_ref)
|
|
return quota_ref
|
|
|
|
|
|
@require_admin_context
|
|
def quota_update(context, project_id, resource, limit):
|
|
with write_session() as session:
|
|
quota_ref = _quota_get(context, project_id, resource, session=session)
|
|
if not quota_ref:
|
|
raise exception.ProjectQuotaNotFound(project_id=project_id)
|
|
quota_ref.hard_limit = limit
|
|
quota_ref.save(session)
|
|
return quota_ref
|
|
|
|
|
|
@require_admin_context
|
|
def quota_destroy(context, project_id, resource):
|
|
with write_session() as session:
|
|
quota_ref = _quota_get(context, project_id, resource, session=session)
|
|
if not quota_ref:
|
|
raise exception.ProjectQuotaNotFound(project_id=project_id)
|
|
session.delete(quota_ref)
|
|
|
|
|
|
@require_admin_context
|
|
def quota_destroy_all(context, project_id):
|
|
with write_session() as session:
|
|
|
|
quotas = model_query(context, models.Quota). \
|
|
filter_by(project_id=project_id). \
|
|
all()
|
|
|
|
if not quotas:
|
|
raise exception.ProjectQuotaNotFound(project_id=project_id)
|
|
|
|
for quota_ref in quotas:
|
|
session.delete(quota_ref)
|
|
|
|
|
|
##########################
|
|
|
|
@require_context
|
|
def _quota_class_get(context, class_name, resource):
|
|
result = model_query(context, models.QuotaClass). \
|
|
filter_by(deleted=0). \
|
|
filter_by(class_name=class_name). \
|
|
filter_by(resource=resource). \
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.QuotaClassNotFound(class_name=class_name)
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
def quota_class_get(context, class_name, resource):
|
|
return _quota_class_get(context, class_name, resource)
|
|
|
|
|
|
@require_context
|
|
def quota_class_get_default(context):
|
|
return quota_class_get_all_by_name(context, _DEFAULT_QUOTA_NAME)
|
|
|
|
|
|
@require_context
|
|
def quota_class_get_all_by_name(context, class_name):
|
|
rows = model_query(context, models.QuotaClass). \
|
|
filter_by(deleted=0). \
|
|
filter_by(class_name=class_name). \
|
|
all()
|
|
|
|
result = {'class_name': class_name}
|
|
for row in rows:
|
|
result[row.resource] = row.hard_limit
|
|
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def quota_class_create(context, class_name, resource, limit):
|
|
with write_session() as session:
|
|
quota_class_ref = models.QuotaClass()
|
|
quota_class_ref.class_name = class_name
|
|
quota_class_ref.resource = resource
|
|
quota_class_ref.hard_limit = limit
|
|
session.add(quota_class_ref)
|
|
return quota_class_ref
|
|
|
|
|
|
@require_admin_context
|
|
def quota_class_update(context, class_name, resource, limit):
|
|
with write_session() as session:
|
|
quota_class_ref = session.query(models.QuotaClass). \
|
|
filter_by(deleted=0). \
|
|
filter_by(class_name=class_name). \
|
|
filter_by(resource=resource).first()
|
|
if not quota_class_ref:
|
|
raise exception.QuotaClassNotFound(class_name=class_name)
|
|
quota_class_ref.hard_limit = limit
|
|
quota_class_ref.save(session)
|
|
return quota_class_ref
|
|
|
|
|
|
@require_admin_context
|
|
def quota_class_destroy_all(context, class_name):
|
|
with write_session() as session:
|
|
quota_classes = session.query(models.QuotaClass). \
|
|
filter_by(deleted=0). \
|
|
filter_by(class_name=class_name). \
|
|
all()
|
|
if quota_classes:
|
|
for quota_class_ref in quota_classes:
|
|
session.delete(quota_class_ref)
|
|
else:
|
|
raise exception.QuotaClassNotFound()
|
|
|
|
|
|
def db_sync(engine, version=None):
|
|
"""Migrate the database to `version` or the most recent version."""
|
|
return migration.db_sync(engine, version=version)
|
|
|
|
|
|
def db_version(engine):
|
|
"""Display the current database version."""
|
|
return migration.db_version(engine)
|
|
|
|
|
|
def service_create(context, service_id, host=None, binary=None,
|
|
topic=None):
|
|
with write_session() as session:
|
|
time_now = timeutils.utcnow()
|
|
svc = models.Service(id=service_id,
|
|
host=host,
|
|
binary=binary,
|
|
topic=topic,
|
|
created_at=time_now,
|
|
updated_at=time_now)
|
|
session.add(svc)
|
|
return svc
|
|
|
|
|
|
def service_update(context, service_id, values=None):
|
|
with write_session() as session:
|
|
service = session.query(models.Service).get(service_id)
|
|
if not service:
|
|
return
|
|
|
|
if values is None:
|
|
values = {}
|
|
|
|
values.update({'updated_at': timeutils.utcnow()})
|
|
service.update(values)
|
|
service.save(session)
|
|
return service
|
|
|
|
|
|
def service_delete(context, service_id):
|
|
with write_session() as session:
|
|
session.query(models.Service).filter_by(
|
|
id=service_id).delete(synchronize_session='fetch')
|
|
|
|
|
|
def service_get(context, service_id):
|
|
return model_query(context, models.Service).get(service_id)
|
|
|
|
|
|
def service_get_all(context):
|
|
return model_query(context, models.Service).all()
|
|
|
|
|
|
##########################
|
|
|
|
# dbapi for orchestrator
|
|
def add_identity_filter(query, value,
|
|
use_region_name=None,
|
|
use_resource_type=None):
|
|
"""Adds an identity filter to a query.
|
|
|
|
Filters results by 'id', if supplied value is a valid integer.
|
|
then attempts to filter results by 'uuid';
|
|
otherwise filters by name
|
|
|
|
:param query: Initial query to add filter to.
|
|
:param value: Value for filtering results by.
|
|
:param use_region_name: Use region_name in filter
|
|
:param use_resource_type: Use resource_type in filter
|
|
|
|
:return: Modified query.
|
|
"""
|
|
if use_region_name:
|
|
return query.filter_by(region_name=value)
|
|
elif strutils.is_int_like(value):
|
|
return query.filter_by(id=value)
|
|
elif uuidutils.is_uuid_like(value):
|
|
return query.filter_by(uuid=value)
|
|
elif use_resource_type:
|
|
return query.filter_by(resource_type=value)
|
|
else:
|
|
return query.filter_by(name=value)
|
|
|
|
|
|
def add_filter_by_many_identities(query, model, values):
|
|
"""Adds an identity filter to a query for values list.
|
|
|
|
Filters results by ID, if supplied values contain a valid integer.
|
|
Otherwise attempts to filter results by UUID.
|
|
|
|
:param query: Initial query to add filter to.
|
|
:param model: Model for filter.
|
|
:param values: Values for filtering results by.
|
|
:return: tuple (Modified query, filter field name).
|
|
"""
|
|
if not values:
|
|
raise exception.Invalid()
|
|
value = values[0]
|
|
if strutils.is_int_like(value):
|
|
return query.filter(getattr(model, 'id').in_(values)), 'id'
|
|
elif uuidutils.is_uuid_like(value):
|
|
return query.filter(getattr(model, 'uuid').in_(values)), 'uuid'
|
|
else:
|
|
raise exception.InvalidParameterValue(
|
|
err="Invalid identity filter value %s" % value)
|
|
|
|
|
|
@require_context
|
|
def _subcloud_get(context, region_id, session=None):
|
|
query = model_query(context, models.Subcloud, session=session). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, region_id, use_region_name=True)
|
|
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.SubcloudNotFound(region_name=region_id)
|
|
except MultipleResultsFound:
|
|
raise exception.InvalidParameterValue(
|
|
err="Multiple entries found for subcloud %s" % region_id)
|
|
|
|
|
|
@require_context
|
|
def subcloud_get(context, region_id):
|
|
return _subcloud_get(context, region_id)
|
|
|
|
|
|
@require_context
|
|
def subcloud_get_all(context, region_name=None,
|
|
management_state=None,
|
|
availability_status=None,
|
|
initial_sync_state=None):
|
|
query = model_query(context, models.Subcloud). \
|
|
filter_by(deleted=0)
|
|
|
|
if region_name:
|
|
query = add_identity_filter(query, region_name, use_region_name=True)
|
|
if management_state:
|
|
query = query.filter_by(management_state=management_state)
|
|
if availability_status:
|
|
query = query.filter_by(availability_status=availability_status)
|
|
if initial_sync_state:
|
|
query = query.filter_by(initial_sync_state=initial_sync_state)
|
|
return query.all()
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_create(context, region_name, values):
|
|
with write_session() as session:
|
|
result = models.Subcloud()
|
|
result.region_name = region_name
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
result.update(values)
|
|
try:
|
|
session.add(result)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.SubcloudAlreadyExists(region_name=region_name)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_update(context, region_name, values):
|
|
with write_session() as session:
|
|
result = _subcloud_get(context, region_name, session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_delete(context, region_name):
|
|
with write_session() as session:
|
|
subclouds = session.query(models.Subcloud). \
|
|
filter_by(deleted=0). \
|
|
filter_by(region_name=region_name). \
|
|
all()
|
|
if subclouds:
|
|
for subcloud_ref in subclouds:
|
|
session.delete(subcloud_ref)
|
|
else:
|
|
raise exception.SubcloudNotFound(region_name=region_name)
|
|
|
|
|
|
@require_context
|
|
def _resource_get(context, resource_type, master_id, session):
|
|
query = model_query(context, models.Resource, session=session). \
|
|
filter_by(deleted=0)
|
|
query = query.filter_by(resource_type=resource_type)
|
|
query = query.filter_by(master_id=master_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.ResourceNotFound(resource_type=resource_type)
|
|
except MultipleResultsFound:
|
|
raise exception.InvalidParameterValue(
|
|
err=("Multiple entries found for resource %(id)s of type %(type)s",
|
|
{'id': master_id, 'type': resource_type}))
|
|
|
|
|
|
@require_context
|
|
def resource_get_by_type_and_master_id(context, resource_type, master_id):
|
|
with read_session() as session:
|
|
return _resource_get(context, resource_type, master_id, session)
|
|
|
|
|
|
@require_context
|
|
def resource_get_by_id(context, resource_id, session=None):
|
|
query = model_query(context, models.Resource, session=session). \
|
|
filter_by(deleted=0)
|
|
query = query.filter_by(id=resource_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.ResourceNotFound(id=resource_id)
|
|
|
|
|
|
@require_context
|
|
def resource_get_all(context, resource_type=None):
|
|
query = model_query(context, models.Resource). \
|
|
filter_by(deleted=0)
|
|
|
|
if resource_type:
|
|
query = add_identity_filter(query, resource_type,
|
|
use_resource_type=True)
|
|
|
|
return query.all()
|
|
|
|
|
|
@require_admin_context
|
|
def resource_create(context, resource_type, values):
|
|
with write_session() as session:
|
|
result = models.Resource()
|
|
result.resource_type = resource_type
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
result.update(values)
|
|
session.add(result)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def resource_update(context, resource_id, values):
|
|
with write_session() as session:
|
|
result = resource_get_by_id(context, resource_id, session=session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def resource_delete(context, resource_type, master_id):
|
|
with write_session() as session:
|
|
resources = session.query(models.Resource). \
|
|
filter_by(deleted=0). \
|
|
filter_by(resource_type=resource_type). \
|
|
filter_by(master_id=master_id). \
|
|
all()
|
|
if resources:
|
|
for resource_ref in resources:
|
|
session.delete(resource_ref)
|
|
else:
|
|
raise exception.ResourceNotFound(resource_type=resource_type)
|
|
|
|
|
|
def add_subcloud_resource_filter_by_subcloud(query, value):
|
|
if strutils.is_int_like(value):
|
|
return query.filter(models.Subcloud.id == value)
|
|
elif uuidutils.is_uuid_like(value):
|
|
return query.filter(models.Subcloud.uuid == value)
|
|
|
|
|
|
@require_context
|
|
def _subcloud_resource_get(context, subcloud_resource_id, session=None):
|
|
query = model_query(context, models.SubcloudResource, session=session). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, subcloud_resource_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.SubcloudResourceNotFound(resource=subcloud_resource_id)
|
|
|
|
|
|
@require_context
|
|
def subcloud_resource_get(context, subcloud_resource_id):
|
|
return _subcloud_resource_get(context, subcloud_resource_id)
|
|
|
|
|
|
@require_context
|
|
def subcloud_resources_get_by_subcloud(context, subcloud_id):
|
|
query = model_query(context, models.SubcloudResource). \
|
|
filter_by(deleted=0)
|
|
if subcloud_id:
|
|
query = (query.join(models.Subcloud,
|
|
models.Subcloud.id ==
|
|
models.SubcloudResource.subcloud_id))
|
|
query, field = add_filter_by_many_identities(
|
|
query, models.Subcloud, [subcloud_id])
|
|
return query.all()
|
|
|
|
|
|
@require_context
|
|
def subcloud_resources_get_by_resource(context, resource_id):
|
|
# query by resource id or uuid, not resource master uuid.
|
|
query = model_query(context, models.SubcloudResource). \
|
|
filter_by(deleted=0)
|
|
if resource_id:
|
|
query = (query.join(models.Resource,
|
|
models.Resource.id ==
|
|
models.SubcloudResource.resource_id))
|
|
query, field = add_filter_by_many_identities(
|
|
query, models.Resource, [resource_id])
|
|
return query.all()
|
|
|
|
|
|
def subcloud_resources_get_all(context):
|
|
query = model_query(context, models.SubcloudResource). \
|
|
filter_by(deleted=0)
|
|
return query.all()
|
|
|
|
|
|
@require_context
|
|
def subcloud_resource_get_by_resource_and_subcloud(
|
|
context, resource_id, subcloud_id):
|
|
query = model_query(context, models.SubcloudResource). \
|
|
filter_by(deleted=0). \
|
|
filter_by(resource_id=resource_id). \
|
|
filter_by(subcloud_id=subcloud_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.SubcloudResourceNotFound()
|
|
except MultipleResultsFound:
|
|
raise exception.InvalidParameterValue(
|
|
err=("Multiple entries found for resource %(rid)d "
|
|
"subcloud %(sid)d",
|
|
{'rid': resource_id, 'sid': subcloud_id}))
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_resource_create(context, subcloud_id, resource_id, values):
|
|
with write_session() as session:
|
|
result = models.SubcloudResource()
|
|
result.subcloud_id = subcloud_id
|
|
result.resource_id = resource_id
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
result.update(values)
|
|
try:
|
|
session.add(result)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.SubcloudResourceAlreadyExists(
|
|
subcloud_id=subcloud_id,
|
|
resource_id=resource_id)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_resource_update(context, subcloud_resource_id, values):
|
|
with write_session() as session:
|
|
result = _subcloud_resource_get(context, subcloud_resource_id, session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def subcloud_resource_delete(context, subcloud_resource_id):
|
|
with write_session() as session:
|
|
query = session.query(models.SubcloudResource). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, subcloud_resource_id)
|
|
try:
|
|
subcloud_resource_ref = query.one()
|
|
except NoResultFound:
|
|
raise exception.SubcloudResourceNotFound(
|
|
resource=subcloud_resource_id)
|
|
session.delete(subcloud_resource_ref)
|
|
|
|
|
|
def add_orch_job_filter_by_resource(query, value):
|
|
if strutils.is_int_like(value):
|
|
return query.filter(models.OrchJob.id == value)
|
|
elif uuidutils.is_uuid_like(value):
|
|
return query.filter(models.OrchJob.uuid == value)
|
|
|
|
|
|
@require_context
|
|
def _orch_job_get(context, orch_job_id, session=None):
|
|
query = model_query(context, models.OrchJob, session=session). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, orch_job_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.OrchJobNotFound(orch_job=orch_job_id)
|
|
|
|
|
|
@require_context
|
|
def orch_job_get(context, orch_job_id):
|
|
return _orch_job_get(context, orch_job_id)
|
|
|
|
|
|
@require_context
|
|
def orch_job_get_all(context, resource_id=None):
|
|
query = model_query(context, models.OrchJob). \
|
|
filter_by(deleted=0)
|
|
if resource_id:
|
|
query = (query.join(models.Resource,
|
|
models.Resource.id == models.OrchJob.resource_id))
|
|
query, field = add_filter_by_many_identities(
|
|
query, models.Resource, [resource_id])
|
|
return query.all()
|
|
|
|
|
|
@require_admin_context
|
|
def orch_job_create(context, resource_id, endpoint_type,
|
|
operation_type, values):
|
|
with write_session() as session:
|
|
result = models.OrchJob()
|
|
result.resource_id = resource_id
|
|
result.endpoint_type = endpoint_type
|
|
result.operation_type = operation_type
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
result.update(values)
|
|
try:
|
|
session.add(result)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.OrchJobAlreadyExists(
|
|
resource_id=resource_id,
|
|
endpoint_type=endpoint_type,
|
|
operation_type=operation_type)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def orch_job_update(context, orch_job_id, values):
|
|
with write_session() as session:
|
|
result = _orch_job_get(context, orch_job_id, session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def orch_job_delete(context, orch_job_id):
|
|
with write_session() as session:
|
|
query = session.query(models.OrchJob). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, orch_job_id)
|
|
try:
|
|
orch_job_ref = query.one()
|
|
except NoResultFound:
|
|
raise exception.OrchJobNotFound(orch_job=orch_job_id)
|
|
session.delete(orch_job_ref)
|
|
|
|
|
|
def add_orch_request_filter_by_resource(query, value):
|
|
if strutils.is_int_like(value):
|
|
return query.filter(models.OrchRequest.id == value)
|
|
elif uuidutils.is_uuid_like(value):
|
|
return query.filter(models.OrchRequest.uuid == value)
|
|
|
|
|
|
@require_context
|
|
def _orch_request_get(context, orch_request_id, session=None):
|
|
query = model_query(context, models.OrchRequest, session=session). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, orch_request_id)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.OrchRequestNotFound(orch_request=orch_request_id)
|
|
|
|
|
|
@require_context
|
|
def orch_request_get(context, orch_request_id):
|
|
return _orch_request_get(context, orch_request_id)
|
|
|
|
|
|
@require_context
|
|
def orch_request_get_most_recent_failed_request(context):
|
|
query = model_query(context, models.OrchRequest). \
|
|
filter_by(deleted=0). \
|
|
filter_by(state=consts.ORCH_REQUEST_STATE_FAILED)
|
|
|
|
try:
|
|
return query.order_by(desc(models.OrchRequest.updated_at)).first()
|
|
except NoResultFound:
|
|
return None
|
|
|
|
|
|
@require_context
|
|
def orch_request_get_all(context, orch_job_id=None):
|
|
query = model_query(context, models.OrchRequest). \
|
|
filter_by(deleted=0)
|
|
if orch_job_id:
|
|
query = (query.join(models.OrchJob,
|
|
models.OrchJob.id ==
|
|
models.OrchRequest.orch_job_id))
|
|
query, field = add_filter_by_many_identities(
|
|
query, models.OrchJob, [orch_job_id])
|
|
return query.all()
|
|
|
|
|
|
@require_context
|
|
def orch_request_get_by_attrs(context,
|
|
endpoint_type,
|
|
resource_type=None,
|
|
target_region_name=None,
|
|
states=None):
|
|
"""Query OrchRequests by attributes.
|
|
|
|
:param context: authorization context
|
|
:param endpoint_type: OrchJob.endpoint_type
|
|
:param resource_type: Resource.resource_type
|
|
:param target_region_name: OrchRequest target_region_name
|
|
:param states: [OrchRequest.state] note: must be a list
|
|
:return: [OrchRequests] sorted by OrchRequest.id
|
|
"""
|
|
query = model_query(context, models.OrchRequest). \
|
|
filter_by(deleted=0)
|
|
|
|
if target_region_name:
|
|
query = query.filter_by(target_region_name=target_region_name)
|
|
|
|
if states:
|
|
states = set(states)
|
|
query = query.filter(models.OrchRequest.state.in_(states))
|
|
|
|
query = query.join(models.OrchJob,
|
|
models.OrchJob.id == models.OrchRequest.orch_job_id). \
|
|
filter_by(endpoint_type=endpoint_type)
|
|
|
|
if resource_type is not None:
|
|
query = query.join(models.Resource,
|
|
models.Resource.id == models.OrchJob.resource_id). \
|
|
filter_by(resource_type=resource_type)
|
|
|
|
# sort by orch_request id
|
|
query = query.order_by(asc(models.OrchRequest.id)).all()
|
|
|
|
return query
|
|
|
|
|
|
@require_admin_context
|
|
def orch_request_create(context, orch_job_id, target_region_name, values):
|
|
with write_session() as session:
|
|
result = models.OrchRequest()
|
|
result.orch_job_id = orch_job_id
|
|
result.target_region_name = target_region_name
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
result.update(values)
|
|
try:
|
|
session.add(result)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.OrchRequestAlreadyExists(
|
|
orch_request=orch_job_id,
|
|
target_region_name=target_region_name)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def orch_request_update(context, orch_request_id, values):
|
|
with write_session() as session:
|
|
result = _orch_request_get(context, orch_request_id, session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
@require_admin_context
|
|
def orch_request_destroy(context, orch_request_id):
|
|
with write_session() as session:
|
|
query = session.query(models.OrchRequest). \
|
|
filter_by(deleted=0)
|
|
query = add_identity_filter(query, orch_request_id)
|
|
try:
|
|
orch_request_ref = query.one()
|
|
except NoResultFound:
|
|
raise exception.OrchRequestNotFound(orch_request=orch_request_id)
|
|
session.delete(orch_request_ref)
|
|
|
|
|
|
@require_admin_context
|
|
def orch_request_delete_by_subcloud(context, region_name):
|
|
"""Delete all orch_request entries for a given subcloud.
|
|
|
|
This is used primarily when deleting a subcloud.
|
|
In particular, it is not a bug if there are no entries to delete.
|
|
"""
|
|
with write_session() as session:
|
|
session.query(models.OrchRequest). \
|
|
filter_by(target_region_name=region_name). \
|
|
delete()
|
|
|
|
|
|
@require_admin_context
|
|
def orch_request_delete_previous_failed_requests(context, delete_timestamp):
|
|
"""Soft delete orch_request entries.
|
|
|
|
This is used to soft delete all previously failed requests at
|
|
the end of each audit cycle.
|
|
"""
|
|
LOG.info('Soft deleting failed orch requests at and before %s',
|
|
delete_timestamp)
|
|
with write_session() as session:
|
|
query = session.query(models.OrchRequest). \
|
|
filter_by(deleted=0). \
|
|
filter_by(state=consts.ORCH_REQUEST_STATE_FAILED). \
|
|
filter(models.OrchRequest.updated_at <= delete_timestamp)
|
|
|
|
count = query.update({'deleted': 1,
|
|
'deleted_at': timeutils.utcnow()})
|
|
LOG.info('%d previously failed sync requests soft deleted', count)
|
|
|
|
|
|
@require_admin_context
|
|
def purge_deleted_records(context, age_in_days):
|
|
deleted_age = \
|
|
timeutils.utcnow() - datetime.timedelta(days=age_in_days)
|
|
|
|
LOG.info('Purging deleted records older than %s', deleted_age)
|
|
|
|
with write_session() as session:
|
|
# Purging orch_request table
|
|
count = session.query(models.OrchRequest). \
|
|
filter_by(deleted=1). \
|
|
filter(models.OrchRequest.deleted_at < deleted_age).delete()
|
|
LOG.info('%d records were purged from orch_request table.', count)
|
|
|
|
# Purging orch_job table
|
|
subquery = model_query(context, models.OrchRequest.orch_job_id). \
|
|
group_by(models.OrchRequest.orch_job_id)
|
|
|
|
count = session.query(models.OrchJob). \
|
|
filter(~models.OrchJob.id.in_(subquery)). \
|
|
delete(synchronize_session='fetch')
|
|
LOG.info('%d records were purged from orch_job table.', count)
|
|
|
|
# Purging resource table
|
|
orchjob_subquery = model_query(context, models.OrchJob.resource_id). \
|
|
group_by(models.OrchJob.resource_id)
|
|
|
|
subcloud_resource_subquery = model_query(
|
|
context, models.SubcloudResource.resource_id). \
|
|
group_by(models.SubcloudResource.resource_id)
|
|
|
|
count = session.query(models.Resource). \
|
|
filter(~models.Resource.id.in_(orchjob_subquery)). \
|
|
filter(~models.Resource.id.in_(subcloud_resource_subquery)). \
|
|
delete(synchronize_session='fetch')
|
|
LOG.info('%d records were purged from resource table.', count)
|
|
|
|
|
|
def sync_lock_acquire(
|
|
context, engine_id, subcloud_name, endpoint_type, action):
|
|
LOG.debug("sync_lock_acquire: %s/%s/%s/%s" % (engine_id, subcloud_name,
|
|
endpoint_type, action))
|
|
with write_session() as session:
|
|
lock = session.query(models.SyncLock). \
|
|
filter_by(deleted=0). \
|
|
filter_by(subcloud_name=subcloud_name). \
|
|
filter_by(endpoint_type=endpoint_type). \
|
|
filter_by(action=action).all()
|
|
if not lock:
|
|
lock_ref = models.SyncLock()
|
|
lock_ref.engine_id = engine_id
|
|
lock_ref.subcloud_name = subcloud_name
|
|
lock_ref.endpoint_type = endpoint_type
|
|
lock_ref.action = action
|
|
try:
|
|
session.add(lock_ref)
|
|
return True
|
|
except IntegrityError:
|
|
LOG.info("IntegrityError Engine id:%s, subcloud:%s, "
|
|
"endpoint_type:%s" %
|
|
(engine_id, subcloud_name, endpoint_type))
|
|
except db_exc.DBDuplicateEntry:
|
|
LOG.info("DBDuplicateEntry Engine id:%s, subcloud:%s, "
|
|
"endpoint_type:%s" %
|
|
(engine_id, subcloud_name, endpoint_type))
|
|
except Exception:
|
|
LOG.exception("Got session add exception")
|
|
return False
|
|
|
|
|
|
# For robustness, this will attempt max_retries with inc_retry_interval
|
|
# backoff to release the sync_lock.
|
|
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
|
|
retry_interval=0.5, inc_retry_interval=True)
|
|
def sync_lock_release(context, subcloud_name, endpoint_type, action):
|
|
with write_session() as session:
|
|
session.query(models.SyncLock).filter_by(
|
|
subcloud_name=subcloud_name). \
|
|
filter_by(endpoint_type=endpoint_type). \
|
|
filter_by(action=action). \
|
|
delete(synchronize_session='fetch')
|
|
|
|
|
|
def sync_lock_steal(context, engine_id, subcloud_name, endpoint_type, action):
|
|
sync_lock_release(context, subcloud_name, endpoint_type, action)
|
|
return sync_lock_acquire(context, engine_id, subcloud_name, endpoint_type,
|
|
action)
|
|
|
|
|
|
def sync_lock_delete_by_engine_id(context, engine_id):
|
|
"""Delete all sync_lock entries for a given engine."""
|
|
|
|
with write_session() as session:
|
|
results = session.query(models.SyncLock). \
|
|
filter_by(engine_id=engine_id).all()
|
|
for result in results:
|
|
LOG.info("Deleted sync lock id=%s engine_id=%s" %
|
|
(result.id, result.engine_id))
|
|
session.delete(result)
|
|
|
|
|
|
def purge_stale_sync_lock(context):
|
|
"""Delete all sync lock entries where service ID no longer exists."""
|
|
LOG.info('Purging stale sync_locks')
|
|
with write_session() as session:
|
|
# Purging sync_lock table
|
|
subquery = model_query(context, models.Service.id). \
|
|
group_by(models.Service.id)
|
|
|
|
count = session.query(models.SyncLock). \
|
|
filter(~models.SyncLock.engine_id.in_(subquery)). \
|
|
delete(synchronize_session='fetch')
|
|
LOG.info('%d records were purged from sync_lock table.', count)
|
|
|
|
|
|
def _subcloud_sync_get(context, subcloud_name, endpoint_type, session=None):
|
|
query = model_query(context, models.SubcloudSync, session=session). \
|
|
filter_by(subcloud_name=subcloud_name). \
|
|
filter_by(endpoint_type=endpoint_type)
|
|
try:
|
|
return query.one()
|
|
except NoResultFound:
|
|
raise exception.SubcloudSyncNotFound(subcloud_name=subcloud_name,
|
|
endpoint_type=endpoint_type)
|
|
except MultipleResultsFound:
|
|
err = ("Multiple entries found for subcloud %s endpoint_type %s" %
|
|
(subcloud_name, endpoint_type))
|
|
raise exception.InvalidParameterValue(err=err)
|
|
|
|
|
|
def subcloud_sync_get(context, subcloud_name, endpoint_type):
|
|
return _subcloud_sync_get(context, subcloud_name, endpoint_type)
|
|
|
|
|
|
def subcloud_sync_create(context, subcloud_name, endpoint_type, values):
|
|
with write_session() as session:
|
|
result = models.SubcloudSync()
|
|
result.subcloud_name = subcloud_name
|
|
result.endpoint_type = endpoint_type
|
|
result.update(values)
|
|
try:
|
|
session.add(result)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.SubcloudSyncAlreadyExists(
|
|
subcloud_name=subcloud_name,
|
|
endpoint_type=endpoint_type)
|
|
return result
|
|
|
|
|
|
def subcloud_sync_update(context, subcloud_name, endpoint_type, values):
|
|
with write_session() as session:
|
|
result = _subcloud_sync_get(context, subcloud_name, endpoint_type,
|
|
session)
|
|
result.update(values)
|
|
result.save(session)
|
|
return result
|
|
|
|
|
|
def subcloud_sync_delete(context, subcloud_name, endpoint_type):
|
|
with write_session() as session:
|
|
results = session.query(models.SubcloudSync). \
|
|
filter_by(subcloud_name=subcloud_name). \
|
|
filter_by(endpoint_type=endpoint_type).all()
|
|
for result in results:
|
|
session.delete(result)
|