# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Implementation of SQLAlchemy backend."""
|
|
|
|
import collections
|
|
import copy
|
|
import datetime
|
|
import functools
|
|
import inspect
|
|
import sys
|
|
|
|
from oslo_db import api as oslo_db_api
|
|
from oslo_db import exception as db_exc
|
|
from oslo_db.sqlalchemy import enginefacade
|
|
from oslo_db.sqlalchemy import update_match
|
|
from oslo_db.sqlalchemy import utils as sqlalchemyutils
|
|
from oslo_log import log as logging
|
|
from oslo_utils import excutils
|
|
from oslo_utils import importutils
|
|
from oslo_utils import timeutils
|
|
from oslo_utils import uuidutils
|
|
import six
|
|
from six.moves import range
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import and_
|
|
from sqlalchemy import Boolean
|
|
from sqlalchemy.exc import NoSuchTableError
|
|
from sqlalchemy.ext.compiler import compiles
|
|
from sqlalchemy import Integer
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.orm import contains_eager
|
|
from sqlalchemy.orm import joinedload
|
|
from sqlalchemy.orm import joinedload_all
|
|
from sqlalchemy.orm import noload
|
|
from sqlalchemy.orm import undefer
|
|
from sqlalchemy.schema import Table
|
|
from sqlalchemy import sql
|
|
from sqlalchemy.sql.expression import asc
|
|
from sqlalchemy.sql.expression import cast
|
|
from sqlalchemy.sql.expression import desc
|
|
from sqlalchemy.sql.expression import UpdateBase
|
|
from sqlalchemy.sql import false
|
|
from sqlalchemy.sql import func
|
|
from sqlalchemy.sql import null
|
|
from sqlalchemy.sql import true
|
|
|
|
from nova import block_device
|
|
from nova.compute import task_states
|
|
from nova.compute import vm_states
|
|
import nova.conf
|
|
import nova.context
|
|
from nova.db.sqlalchemy import models
|
|
from nova import exception
|
|
from nova.i18n import _
|
|
from nova import safe_utils
|
|
|
|
profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
|
|
|
|
CONF = nova.conf.CONF
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
main_context_manager = enginefacade.transaction_context()
|
|
api_context_manager = enginefacade.transaction_context()
|
|
|
|
|
|
def _get_db_conf(conf_group, connection=None):
|
|
kw = dict(conf_group.items())
|
|
if connection is not None:
|
|
kw['connection'] = connection
|
|
return kw
|
|
|
|
|
|
def _context_manager_from_context(context):
|
|
if context:
|
|
try:
|
|
return context.db_connection
|
|
except AttributeError:
|
|
pass
|
|
|
|
|
|
def configure(conf):
|
|
main_context_manager.configure(**_get_db_conf(conf.database))
|
|
api_context_manager.configure(**_get_db_conf(conf.api_database))
|
|
|
|
if profiler_sqlalchemy and CONF.profiler.enabled \
|
|
and CONF.profiler.trace_sqlalchemy:
|
|
|
|
main_context_manager.append_on_engine_create(
|
|
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
|
|
api_context_manager.append_on_engine_create(
|
|
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
|
|
|
|
|
|
def create_context_manager(connection=None):
|
|
"""Create a database context manager object.
|
|
|
|
: param connection: The database connection string
|
|
"""
|
|
ctxt_mgr = enginefacade.transaction_context()
|
|
ctxt_mgr.configure(**_get_db_conf(CONF.database, connection=connection))
|
|
return ctxt_mgr
|
|
|
|
|
|
def get_context_manager(context):
|
|
"""Get a database context manager object.
|
|
|
|
:param context: The request context that can contain a context manager
|
|
"""
|
|
return _context_manager_from_context(context) or main_context_manager
|
|
|
|
|
|
def get_engine(use_slave=False, context=None):
|
|
"""Get a database engine object.
|
|
|
|
:param use_slave: Whether to use the slave connection
|
|
:param context: The request context that can contain a context manager
|
|
"""
|
|
ctxt_mgr = get_context_manager(context)
|
|
return ctxt_mgr.get_legacy_facade().get_engine(use_slave=use_slave)
|
|
|
|
|
|
def get_api_engine():
|
|
return api_context_manager.get_legacy_facade().get_engine()
|
|
|
|
|
|
_SHADOW_TABLE_PREFIX = 'shadow_'
|
|
_DEFAULT_QUOTA_NAME = 'default'
|
|
PER_PROJECT_QUOTAS = ['fixed_ips', 'floating_ips', 'networks']
|
|
|
|
|
|
def get_backend():
|
|
"""The backend is this module itself."""
|
|
return sys.modules[__name__]
|
|
|
|
|
|
def require_context(f):
|
|
"""Decorator to require *any* user or admin context.
|
|
|
|
This does no authorization for user or project access matching, see
|
|
:py:func:`nova.context.authorize_project_context` and
|
|
:py:func:`nova.context.authorize_user_context`.
|
|
|
|
The first argument to the wrapped function must be the context.
|
|
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
nova.context.require_context(args[0])
|
|
return f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def require_instance_exists_using_uuid(f):
|
|
"""Decorator to require the specified instance to exist.
|
|
|
|
Requires the wrapped function to use context and instance_uuid as
|
|
their first two arguments.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapper(context, instance_uuid, *args, **kwargs):
|
|
instance_get_by_uuid(context, instance_uuid)
|
|
return f(context, instance_uuid, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def require_aggregate_exists(f):
|
|
"""Decorator to require the specified aggregate to exist.
|
|
|
|
Requires the wrapped function to use context and aggregate_id as
|
|
their first two arguments.
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def wrapper(context, aggregate_id, *args, **kwargs):
|
|
aggregate_get(context, aggregate_id)
|
|
return f(context, aggregate_id, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def select_db_reader_mode(f):
|
|
"""Decorator to select synchronous or asynchronous reader mode.
|
|
|
|
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
|
|
will be used if 'use_slave' is True and synchronous reader otherwise.
|
|
If 'use_slave' is not specified default value 'False' will be used.
|
|
|
|
Wrapped function must have a context in the arguments.
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
wrapped_func = safe_utils.get_wrapped_function(f)
|
|
keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)
|
|
|
|
context = keyed_args['context']
|
|
use_slave = keyed_args.get('use_slave', False)
|
|
|
|
if use_slave:
|
|
reader_mode = get_context_manager(context).async
|
|
else:
|
|
reader_mode = get_context_manager(context).reader
|
|
|
|
with reader_mode.using(context):
|
|
return f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def pick_context_manager_writer(f):
|
|
"""Decorator to use a writer db context manager.
|
|
|
|
The db context manager will be picked from the RequestContext.
|
|
|
|
Wrapped function must have a RequestContext in the arguments.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapped(context, *args, **kwargs):
|
|
ctxt_mgr = get_context_manager(context)
|
|
with ctxt_mgr.writer.using(context):
|
|
return f(context, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def pick_context_manager_reader(f):
|
|
"""Decorator to use a reader db context manager.
|
|
|
|
The db context manager will be picked from the RequestContext.
|
|
|
|
Wrapped function must have a RequestContext in the arguments.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapped(context, *args, **kwargs):
|
|
ctxt_mgr = get_context_manager(context)
|
|
with ctxt_mgr.reader.using(context):
|
|
return f(context, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def pick_context_manager_reader_allow_async(f):
|
|
"""Decorator to use a reader.allow_async db context manager.
|
|
|
|
The db context manager will be picked from the RequestContext.
|
|
|
|
Wrapped function must have a RequestContext in the arguments.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapped(context, *args, **kwargs):
|
|
ctxt_mgr = get_context_manager(context)
|
|
with ctxt_mgr.reader.allow_async.using(context):
|
|
return f(context, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def model_query(context, model,
|
|
args=None,
|
|
read_deleted=None,
|
|
project_only=False):
|
|
"""Query helper that accounts for context's `read_deleted` field.
|
|
|
|
:param context: NovaContext of the query.
|
|
:param model: Model to query. Must be a subclass of ModelBase.
|
|
:param args: Arguments to query. If None - model is used.
|
|
:param read_deleted: If not None, overrides context's read_deleted field.
|
|
Permitted values are 'no', which does not return
|
|
deleted values; 'only', which only returns deleted
|
|
values; and 'yes', which does not filter deleted
|
|
values.
|
|
:param project_only: If set and context is user-type, then restrict
|
|
query to match the context's project_id. If set to
|
|
'allow_none', restriction includes project_id = None.
|
|
"""
|
|
|
|
if read_deleted is None:
|
|
read_deleted = context.read_deleted
|
|
|
|
query_kwargs = {}
|
|
if 'no' == read_deleted:
|
|
query_kwargs['deleted'] = False
|
|
elif 'only' == read_deleted:
|
|
query_kwargs['deleted'] = True
|
|
elif 'yes' == read_deleted:
|
|
pass
|
|
else:
|
|
raise ValueError(_("Unrecognized read_deleted value '%s'")
|
|
% read_deleted)
|
|
|
|
query = sqlalchemyutils.model_query(
|
|
model, context.session, args, **query_kwargs)
|
|
|
|
# We can't use oslo.db model_query's project_id here, as it doesn't allow
|
|
# us to return both our projects and unowned projects.
|
|
if nova.context.is_user_context(context) and project_only:
|
|
if project_only == 'allow_none':
|
|
query = query.\
|
|
filter(or_(model.project_id == context.project_id,
|
|
model.project_id == null()))
|
|
else:
|
|
query = query.filter_by(project_id=context.project_id)
|
|
|
|
return query
|
|
|
|
|
|
def convert_objects_related_datetimes(values, *datetime_keys):
|
|
if not datetime_keys:
|
|
datetime_keys = ('created_at', 'deleted_at', 'updated_at')
|
|
|
|
for key in datetime_keys:
|
|
if key in values and values[key]:
|
|
if isinstance(values[key], six.string_types):
|
|
try:
|
|
values[key] = timeutils.parse_strtime(values[key])
|
|
except ValueError:
|
|
# Try alternate parsing since parse_strtime will fail
|
|
# with say converting '2015-05-28T19:59:38+00:00'
|
|
values[key] = timeutils.parse_isotime(values[key])
|
|
# NOTE(danms): Strip UTC timezones from datetimes, since they're
|
|
# stored that way in the database
|
|
values[key] = values[key].replace(tzinfo=None)
|
|
return values
|
|
|
|
|
|
###################
|
|
|
|
|
|
def constraint(**conditions):
|
|
return Constraint(conditions)
|
|
|
|
|
|
def equal_any(*values):
|
|
return EqualityCondition(values)
|
|
|
|
|
|
def not_equal(*values):
|
|
return InequalityCondition(values)
|
|
|
|
|
|
class Constraint(object):
|
|
|
|
def __init__(self, conditions):
|
|
self.conditions = conditions
|
|
|
|
def apply(self, model, query):
|
|
for key, condition in self.conditions.items():
|
|
for clause in condition.clauses(getattr(model, key)):
|
|
query = query.filter(clause)
|
|
return query
|
|
|
|
|
|
class EqualityCondition(object):
|
|
|
|
def __init__(self, values):
|
|
self.values = values
|
|
|
|
def clauses(self, field):
|
|
# method signature requires us to return an iterable even if for OR
|
|
# operator this will actually be a single clause
|
|
return [or_(*[field == value for value in self.values])]
|
|
|
|
|
|
class InequalityCondition(object):
|
|
|
|
def __init__(self, values):
|
|
self.values = values
|
|
|
|
def clauses(self, field):
|
|
return [field != value for value in self.values]
|
|
|
|
|
|
class DeleteFromSelect(UpdateBase):
|
|
def __init__(self, table, select, column):
|
|
self.table = table
|
|
self.select = select
|
|
self.column = column
|
|
|
|
|
|
# NOTE(guochbo): some versions of MySQL doesn't yet support subquery with
|
|
# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select .
|
|
@compiles(DeleteFromSelect)
|
|
def visit_delete_from_select(element, compiler, **kw):
|
|
return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % (
|
|
compiler.process(element.table, asfrom=True),
|
|
compiler.process(element.column),
|
|
element.column.name,
|
|
compiler.process(element.select))
|
|
|
|
###################
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def service_destroy(context, service_id):
|
|
service = service_get(context, service_id)
|
|
|
|
model_query(context, models.Service).\
|
|
filter_by(id=service_id).\
|
|
soft_delete(synchronize_session=False)
|
|
|
|
if service.binary == 'nova-compute':
|
|
# TODO(sbauza): Remove the service_id filter in a later release
|
|
# once we are sure that all compute nodes report the host field
|
|
model_query(context, models.ComputeNode).\
|
|
filter(or_(models.ComputeNode.service_id == service_id,
|
|
models.ComputeNode.host == service['host'])).\
|
|
soft_delete(synchronize_session=False)
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get(context, service_id):
|
|
query = model_query(context, models.Service).filter_by(id=service_id)
|
|
|
|
result = query.first()
|
|
if not result:
|
|
raise exception.ServiceNotFound(service_id=service_id)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_by_uuid(context, service_uuid):
|
|
query = model_query(context, models.Service).filter_by(uuid=service_uuid)
|
|
|
|
result = query.first()
|
|
if not result:
|
|
raise exception.ServiceNotFound(service_id=service_uuid)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader_allow_async
|
|
def service_get_minimum_version(context, binaries):
|
|
min_versions = context.session.query(
|
|
models.Service.binary,
|
|
func.min(models.Service.version)).\
|
|
filter(models.Service.binary.in_(binaries)).\
|
|
filter(models.Service.deleted == 0).\
|
|
filter(models.Service.forced_down == false()).\
|
|
group_by(models.Service.binary)
|
|
return dict(min_versions)
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_all(context, disabled=None):
|
|
query = model_query(context, models.Service)
|
|
|
|
if disabled is not None:
|
|
query = query.filter_by(disabled=disabled)
|
|
|
|
return query.all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_all_by_topic(context, topic):
|
|
return model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(disabled=False).\
|
|
filter_by(topic=topic).\
|
|
all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_by_host_and_topic(context, host, topic):
|
|
return model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(disabled=False).\
|
|
filter_by(host=host).\
|
|
filter_by(topic=topic).\
|
|
first()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_all_by_binary(context, binary, include_disabled=False):
|
|
query = model_query(context, models.Service).filter_by(binary=binary)
|
|
if not include_disabled:
|
|
query = query.filter_by(disabled=False)
|
|
return query.all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_all_computes_by_hv_type(context, hv_type,
|
|
include_disabled=False):
|
|
query = model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(binary='nova-compute')
|
|
if not include_disabled:
|
|
query = query.filter_by(disabled=False)
|
|
query = query.join(models.ComputeNode,
|
|
models.Service.host == models.ComputeNode.host).\
|
|
filter(models.ComputeNode.hypervisor_type == hv_type).\
|
|
distinct('host')
|
|
return query.all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_by_host_and_binary(context, host, binary):
|
|
result = model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(host=host).\
|
|
filter_by(binary=binary).\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.HostBinaryNotFound(host=host, binary=binary)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def service_get_all_by_host(context, host):
|
|
return model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(host=host).\
|
|
all()
|
|
|
|
|
|
@pick_context_manager_reader_allow_async
|
|
def service_get_by_compute_host(context, host):
|
|
result = model_query(context, models.Service, read_deleted="no").\
|
|
filter_by(host=host).\
|
|
filter_by(binary='nova-compute').\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.ComputeHostNotFound(host=host)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def service_create(context, values):
|
|
service_ref = models.Service()
|
|
service_ref.update(values)
|
|
# We only auto-disable nova-compute services since those are the only
|
|
# ones that can be enabled using the os-services REST API and they are
|
|
# the only ones where being disabled means anything. It does
|
|
# not make sense to be able to disable non-compute services like
|
|
# nova-scheduler or nova-osapi_compute since that does nothing.
|
|
if not CONF.enable_new_services and values.get('binary') == 'nova-compute':
|
|
msg = _("New compute service disabled due to config option.")
|
|
service_ref.disabled = True
|
|
service_ref.disabled_reason = msg
|
|
try:
|
|
service_ref.save(context.session)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
if 'binary' in e.columns:
|
|
raise exception.ServiceBinaryExists(host=values.get('host'),
|
|
binary=values.get('binary'))
|
|
raise exception.ServiceTopicExists(host=values.get('host'),
|
|
topic=values.get('topic'))
|
|
return service_ref
|
|
|
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def service_update(context, service_id, values):
|
|
service_ref = service_get(context, service_id)
|
|
# Only servicegroup.drivers.db.DbDriver._report_state() updates
|
|
# 'report_count', so if that value changes then store the timestamp
|
|
# as the last time we got a state report.
|
|
if 'report_count' in values:
|
|
if values['report_count'] > service_ref.report_count:
|
|
service_ref.last_seen_up = timeutils.utcnow()
|
|
service_ref.update(values)
|
|
|
|
return service_ref
|
|
|
|
|
|
###################
|
|
|
|
|
|
def _compute_node_select(context, filters=None, limit=None, marker=None):
|
|
if filters is None:
|
|
filters = {}
|
|
|
|
cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn')
|
|
select = sa.select([cn_tbl])
|
|
|
|
if context.read_deleted == "no":
|
|
select = select.where(cn_tbl.c.deleted == 0)
|
|
if "compute_id" in filters:
|
|
select = select.where(cn_tbl.c.id == filters["compute_id"])
|
|
if "service_id" in filters:
|
|
select = select.where(cn_tbl.c.service_id == filters["service_id"])
|
|
if "host" in filters:
|
|
select = select.where(cn_tbl.c.host == filters["host"])
|
|
if "hypervisor_hostname" in filters:
|
|
hyp_hostname = filters["hypervisor_hostname"]
|
|
select = select.where(cn_tbl.c.hypervisor_hostname == hyp_hostname)
|
|
if "mapped" in filters:
|
|
select = select.where(cn_tbl.c.mapped < filters['mapped'])
|
|
if marker is not None:
|
|
try:
|
|
compute_node_get(context, marker)
|
|
except exception.ComputeHostNotFound:
|
|
raise exception.MarkerNotFound(marker=marker)
|
|
select = select.where(cn_tbl.c.id > marker)
|
|
if limit is not None:
|
|
select = select.limit(limit)
|
|
# Explicitly order by id, so we're not dependent on the native sort
|
|
# order of the underlying DB.
|
|
select = select.order_by(asc("id"))
|
|
return select
|
|
|
|
|
|
def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
|
|
select = _compute_node_select(context, filters, limit=limit, marker=marker)
|
|
engine = get_engine(context=context)
|
|
conn = engine.connect()
|
|
|
|
results = conn.execute(select).fetchall()
|
|
|
|
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
|
|
results = [dict(r) for r in results]
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get(context, compute_id):
|
|
results = _compute_node_fetchall(context, {"compute_id": compute_id})
|
|
if not results:
|
|
raise exception.ComputeHostNotFound(host=compute_id)
|
|
return results[0]
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get_model(context, compute_id):
|
|
# TODO(edleafe): remove once the compute node resource provider migration
|
|
# is complete, and this distinction is no longer necessary.
|
|
result = model_query(context, models.ComputeNode).\
|
|
filter_by(id=compute_id).\
|
|
first()
|
|
if not result:
|
|
raise exception.ComputeHostNotFound(host=compute_id)
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_nodes_get_by_service_id(context, service_id):
|
|
results = _compute_node_fetchall(context, {"service_id": service_id})
|
|
if not results:
|
|
raise exception.ServiceNotFound(service_id=service_id)
|
|
return results
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get_by_host_and_nodename(context, host, nodename):
|
|
results = _compute_node_fetchall(context,
|
|
{"host": host, "hypervisor_hostname": nodename})
|
|
if not results:
|
|
raise exception.ComputeHostNotFound(host=host)
|
|
return results[0]
|
|
|
|
|
|
@pick_context_manager_reader_allow_async
|
|
def compute_node_get_all_by_host(context, host):
|
|
results = _compute_node_fetchall(context, {"host": host})
|
|
if not results:
|
|
raise exception.ComputeHostNotFound(host=host)
|
|
return results
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get_all(context):
|
|
return _compute_node_fetchall(context)
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get_all_mapped_less_than(context, mapped_less_than):
|
|
return _compute_node_fetchall(context,
|
|
{'mapped': mapped_less_than})
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_get_all_by_pagination(context, limit=None, marker=None):
|
|
return _compute_node_fetchall(context, limit=limit, marker=marker)
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_search_by_hypervisor(context, hypervisor_match):
|
|
field = models.ComputeNode.hypervisor_hostname
|
|
return model_query(context, models.ComputeNode).\
|
|
filter(field.like('%%%s%%' % hypervisor_match)).\
|
|
all()
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def compute_node_create(context, values):
|
|
"""Creates a new ComputeNode and populates the capacity fields
|
|
with the most recent data.
|
|
"""
|
|
convert_objects_related_datetimes(values)
|
|
|
|
compute_node_ref = models.ComputeNode()
|
|
compute_node_ref.update(values)
|
|
compute_node_ref.save(context.session)
|
|
|
|
return compute_node_ref
|
|
|
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def compute_node_update(context, compute_id, values):
|
|
"""Updates the ComputeNode record with the most recent data."""
|
|
|
|
compute_ref = compute_node_get_model(context, compute_id)
|
|
# Always update this, even if there's going to be no other
|
|
# changes in data. This ensures that we invalidate the
|
|
# scheduler cache of compute node data in case of races.
|
|
values['updated_at'] = timeutils.utcnow()
|
|
convert_objects_related_datetimes(values)
|
|
compute_ref.update(values)
|
|
|
|
return compute_ref
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def compute_node_delete(context, compute_id):
|
|
"""Delete a ComputeNode record."""
|
|
result = model_query(context, models.ComputeNode).\
|
|
filter_by(id=compute_id).\
|
|
soft_delete(synchronize_session=False)
|
|
|
|
if not result:
|
|
raise exception.ComputeHostNotFound(host=compute_id)
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def compute_node_statistics(context):
|
|
"""Compute statistics over all compute nodes."""
|
|
engine = get_engine(context=context)
|
|
services_tbl = models.Service.__table__
|
|
|
|
inner_sel = sa.alias(_compute_node_select(context), name='inner_sel')
|
|
|
|
# TODO(sbauza): Remove the service_id filter in a later release
|
|
# once we are sure that all compute nodes report the host field
|
|
j = sa.join(
|
|
inner_sel, services_tbl,
|
|
sql.and_(
|
|
sql.or_(
|
|
inner_sel.c.host == services_tbl.c.host,
|
|
inner_sel.c.service_id == services_tbl.c.id
|
|
),
|
|
services_tbl.c.disabled == false(),
|
|
services_tbl.c.binary == 'nova-compute',
|
|
services_tbl.c.deleted == 0
|
|
)
|
|
)
|
|
|
|
# NOTE(jaypipes): This COALESCE() stuff is temporary while the data
|
|
# migration to the new resource providers inventories and allocations
|
|
# tables is completed.
|
|
agg_cols = [
|
|
func.count().label('count'),
|
|
sql.func.sum(
|
|
inner_sel.c.vcpus
|
|
).label('vcpus'),
|
|
sql.func.sum(
|
|
inner_sel.c.memory_mb
|
|
).label('memory_mb'),
|
|
sql.func.sum(
|
|
inner_sel.c.local_gb
|
|
).label('local_gb'),
|
|
sql.func.sum(
|
|
inner_sel.c.vcpus_used
|
|
).label('vcpus_used'),
|
|
sql.func.sum(
|
|
inner_sel.c.memory_mb_used
|
|
).label('memory_mb_used'),
|
|
sql.func.sum(
|
|
inner_sel.c.local_gb_used
|
|
).label('local_gb_used'),
|
|
sql.func.sum(
|
|
inner_sel.c.free_ram_mb
|
|
).label('free_ram_mb'),
|
|
sql.func.sum(
|
|
inner_sel.c.free_disk_gb
|
|
).label('free_disk_gb'),
|
|
sql.func.sum(
|
|
inner_sel.c.current_workload
|
|
).label('current_workload'),
|
|
sql.func.sum(
|
|
inner_sel.c.running_vms
|
|
).label('running_vms'),
|
|
sql.func.sum(
|
|
inner_sel.c.disk_available_least
|
|
).label('disk_available_least'),
|
|
]
|
|
select = sql.select(agg_cols).select_from(j)
|
|
conn = engine.connect()
|
|
|
|
results = conn.execute(select).fetchone()
|
|
|
|
# Build a dict of the info--making no assumptions about result
|
|
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
|
|
'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb',
|
|
'current_workload', 'running_vms', 'disk_available_least')
|
|
results = {field: int(results[idx] or 0)
|
|
for idx, field in enumerate(fields)}
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
###################
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def certificate_create(context, values):
|
|
certificate_ref = models.Certificate()
|
|
for (key, value) in values.items():
|
|
certificate_ref[key] = value
|
|
certificate_ref.save(context.session)
|
|
return certificate_ref
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def certificate_get_all_by_project(context, project_id):
|
|
return model_query(context, models.Certificate, read_deleted="no").\
|
|
filter_by(project_id=project_id).\
|
|
all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def certificate_get_all_by_user(context, user_id):
|
|
return model_query(context, models.Certificate, read_deleted="no").\
|
|
filter_by(user_id=user_id).\
|
|
all()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def certificate_get_all_by_user_and_project(context, user_id, project_id):
|
|
return model_query(context, models.Certificate, read_deleted="no").\
|
|
filter_by(user_id=user_id).\
|
|
filter_by(project_id=project_id).\
|
|
all()
|
|
|
|
|
|
###################
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get(context, id):
|
|
try:
|
|
result = model_query(context, models.FloatingIp, project_only=True).\
|
|
filter_by(id=id).\
|
|
options(joinedload_all('fixed_ip.instance')).\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.FloatingIpNotFound(id=id)
|
|
except db_exc.DBError:
|
|
LOG.warning("Invalid floating IP ID %s in request", id)
|
|
raise exception.InvalidID(id=id)
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_pools(context):
|
|
pools = []
|
|
for result in model_query(context, models.FloatingIp,
|
|
(models.FloatingIp.pool,)).distinct():
|
|
pools.append({'name': result[0]})
|
|
return pools
|
|
|
|
|
|
@require_context
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def floating_ip_allocate_address(context, project_id, pool,
|
|
auto_assigned=False):
|
|
nova.context.authorize_project_context(context, project_id)
|
|
floating_ip_ref = model_query(context, models.FloatingIp,
|
|
read_deleted="no").\
|
|
filter_by(fixed_ip_id=None).\
|
|
filter_by(project_id=None).\
|
|
filter_by(pool=pool).\
|
|
first()
|
|
|
|
if not floating_ip_ref:
|
|
raise exception.NoMoreFloatingIps()
|
|
|
|
params = {'project_id': project_id, 'auto_assigned': auto_assigned}
|
|
|
|
rows_update = model_query(context, models.FloatingIp, read_deleted="no").\
|
|
filter_by(id=floating_ip_ref['id']).\
|
|
filter_by(fixed_ip_id=None).\
|
|
filter_by(project_id=None).\
|
|
filter_by(pool=pool).\
|
|
update(params, synchronize_session='evaluate')
|
|
|
|
if not rows_update:
|
|
LOG.debug('The row was updated in a concurrent transaction, '
|
|
'we will fetch another one')
|
|
raise db_exc.RetryRequest(exception.FloatingIpAllocateFailed())
|
|
|
|
return floating_ip_ref['address']
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_bulk_create(context, ips, want_result=True):
|
|
try:
|
|
tab = models.FloatingIp().__table__
|
|
context.session.execute(tab.insert(), ips)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise exception.FloatingIpExists(address=e.value)
|
|
|
|
if want_result:
|
|
return model_query(context, models.FloatingIp).filter(
|
|
models.FloatingIp.address.in_(
|
|
[ip['address'] for ip in ips])).all()
|
|
|
|
|
|
def _ip_range_splitter(ips, block_size=256):
|
|
"""Yields blocks of IPs no more than block_size elements long."""
|
|
out = []
|
|
count = 0
|
|
for ip in ips:
|
|
out.append(ip['address'])
|
|
count += 1
|
|
|
|
if count > block_size - 1:
|
|
yield out
|
|
out = []
|
|
count = 0
|
|
|
|
if out:
|
|
yield out
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_bulk_destroy(context, ips):
|
|
project_id_to_quota_count = collections.defaultdict(int)
|
|
for ip_block in _ip_range_splitter(ips):
|
|
# Find any floating IPs that were not auto_assigned and
|
|
# thus need quota released.
|
|
query = model_query(context, models.FloatingIp).\
|
|
filter(models.FloatingIp.address.in_(ip_block)).\
|
|
filter_by(auto_assigned=False)
|
|
for row in query.all():
|
|
# The count is negative since we release quota by
|
|
# reserving negative quota.
|
|
project_id_to_quota_count[row['project_id']] -= 1
|
|
# Delete the floating IPs.
|
|
model_query(context, models.FloatingIp).\
|
|
filter(models.FloatingIp.address.in_(ip_block)).\
|
|
soft_delete(synchronize_session='fetch')
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_create(context, values):
|
|
floating_ip_ref = models.FloatingIp()
|
|
floating_ip_ref.update(values)
|
|
try:
|
|
floating_ip_ref.save(context.session)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.FloatingIpExists(address=values['address'])
|
|
return floating_ip_ref
|
|
|
|
|
|
def _floating_ip_count_by_project(context, project_id):
|
|
nova.context.authorize_project_context(context, project_id)
|
|
# TODO(tr3buchet): why leave auto_assigned floating IPs out?
|
|
return model_query(context, models.FloatingIp, read_deleted="no").\
|
|
filter_by(project_id=project_id).\
|
|
filter_by(auto_assigned=False).\
|
|
count()
|
|
|
|
|
|
@require_context
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def floating_ip_fixed_ip_associate(context, floating_address,
|
|
fixed_address, host):
|
|
fixed_ip_ref = model_query(context, models.FixedIp).\
|
|
filter_by(address=fixed_address).\
|
|
options(joinedload('network')).\
|
|
first()
|
|
if not fixed_ip_ref:
|
|
raise exception.FixedIpNotFoundForAddress(address=fixed_address)
|
|
rows = model_query(context, models.FloatingIp).\
|
|
filter_by(address=floating_address).\
|
|
filter(models.FloatingIp.project_id ==
|
|
context.project_id).\
|
|
filter(or_(models.FloatingIp.fixed_ip_id ==
|
|
fixed_ip_ref['id'],
|
|
models.FloatingIp.fixed_ip_id.is_(None))).\
|
|
update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host})
|
|
|
|
if not rows:
|
|
raise exception.FloatingIpAssociateFailed(address=floating_address)
|
|
|
|
return fixed_ip_ref
|
|
|
|
|
|
@require_context
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def floating_ip_deallocate(context, address):
|
|
return model_query(context, models.FloatingIp).\
|
|
filter_by(address=address).\
|
|
filter(and_(models.FloatingIp.project_id != null()),
|
|
models.FloatingIp.fixed_ip_id == null()).\
|
|
update({'project_id': None,
|
|
'host': None,
|
|
'auto_assigned': False},
|
|
synchronize_session=False)
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_destroy(context, address):
|
|
model_query(context, models.FloatingIp).\
|
|
filter_by(address=address).\
|
|
delete()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_disassociate(context, address):
|
|
floating_ip_ref = model_query(context,
|
|
models.FloatingIp).\
|
|
filter_by(address=address).\
|
|
first()
|
|
if not floating_ip_ref:
|
|
raise exception.FloatingIpNotFoundForAddress(address=address)
|
|
|
|
fixed_ip_ref = model_query(context, models.FixedIp).\
|
|
filter_by(id=floating_ip_ref['fixed_ip_id']).\
|
|
options(joinedload('network')).\
|
|
first()
|
|
floating_ip_ref.fixed_ip_id = None
|
|
floating_ip_ref.host = None
|
|
|
|
return fixed_ip_ref
|
|
|
|
|
|
def _floating_ip_get_all(context):
|
|
return model_query(context, models.FloatingIp, read_deleted="no")
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_all(context):
|
|
floating_ip_refs = _floating_ip_get_all(context).\
|
|
options(joinedload('fixed_ip')).\
|
|
all()
|
|
if not floating_ip_refs:
|
|
raise exception.NoFloatingIpsDefined()
|
|
return floating_ip_refs
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_all_by_host(context, host):
|
|
floating_ip_refs = _floating_ip_get_all(context).\
|
|
filter_by(host=host).\
|
|
options(joinedload('fixed_ip')).\
|
|
all()
|
|
if not floating_ip_refs:
|
|
raise exception.FloatingIpNotFoundForHost(host=host)
|
|
return floating_ip_refs
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_all_by_project(context, project_id):
|
|
nova.context.authorize_project_context(context, project_id)
|
|
# TODO(tr3buchet): why do we not want auto_assigned floating IPs here?
|
|
return _floating_ip_get_all(context).\
|
|
filter_by(project_id=project_id).\
|
|
filter_by(auto_assigned=False).\
|
|
options(joinedload_all('fixed_ip.instance')).\
|
|
all()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_by_address(context, address):
|
|
return _floating_ip_get_by_address(context, address)
|
|
|
|
|
|
def _floating_ip_get_by_address(context, address):
|
|
|
|
# if address string is empty explicitly set it to None
|
|
if not address:
|
|
address = None
|
|
try:
|
|
result = model_query(context, models.FloatingIp).\
|
|
filter_by(address=address).\
|
|
options(joinedload_all('fixed_ip.instance')).\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.FloatingIpNotFoundForAddress(address=address)
|
|
except db_exc.DBError:
|
|
msg = _("Invalid floating IP %s in request") % address
|
|
LOG.warning(msg)
|
|
raise exception.InvalidIpAddressError(msg)
|
|
|
|
# If the floating IP has a project ID set, check to make sure
|
|
# the non-admin user has access.
|
|
if result.project_id and nova.context.is_user_context(context):
|
|
nova.context.authorize_project_context(context, result.project_id)
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_by_fixed_address(context, fixed_address):
|
|
return model_query(context, models.FloatingIp).\
|
|
outerjoin(models.FixedIp,
|
|
models.FixedIp.id ==
|
|
models.FloatingIp.fixed_ip_id).\
|
|
filter(models.FixedIp.address == fixed_address).\
|
|
all()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id):
|
|
return model_query(context, models.FloatingIp).\
|
|
filter_by(fixed_ip_id=fixed_ip_id).\
|
|
all()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def floating_ip_update(context, address, values):
|
|
float_ip_ref = _floating_ip_get_by_address(context, address)
|
|
float_ip_ref.update(values)
|
|
try:
|
|
float_ip_ref.save(context.session)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.FloatingIpExists(address=values['address'])
|
|
return float_ip_ref
|
|
|
|
|
|
###################
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def dnsdomain_get(context, fqdomain):
|
|
return model_query(context, models.DNSDomain, read_deleted="no").\
|
|
filter_by(domain=fqdomain).\
|
|
with_lockmode('update').\
|
|
first()
|
|
|
|
|
|
def _dnsdomain_get_or_create(context, fqdomain):
|
|
domain_ref = dnsdomain_get(context, fqdomain)
|
|
if not domain_ref:
|
|
dns_ref = models.DNSDomain()
|
|
dns_ref.update({'domain': fqdomain,
|
|
'availability_zone': None,
|
|
'project_id': None})
|
|
return dns_ref
|
|
|
|
return domain_ref
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def dnsdomain_register_for_zone(context, fqdomain, zone):
|
|
domain_ref = _dnsdomain_get_or_create(context, fqdomain)
|
|
domain_ref.scope = 'private'
|
|
domain_ref.availability_zone = zone
|
|
context.session.add(domain_ref)
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def dnsdomain_register_for_project(context, fqdomain, project):
|
|
domain_ref = _dnsdomain_get_or_create(context, fqdomain)
|
|
domain_ref.scope = 'public'
|
|
domain_ref.project_id = project
|
|
context.session.add(domain_ref)
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def dnsdomain_unregister(context, fqdomain):
|
|
model_query(context, models.DNSDomain).\
|
|
filter_by(domain=fqdomain).\
|
|
delete()
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def dnsdomain_get_all(context):
|
|
return model_query(context, models.DNSDomain, read_deleted="no").all()
|
|
|
|
|
|
###################
|
|
|
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def fixed_ip_associate(context, address, instance_uuid, network_id=None,
|
|
reserved=False, virtual_interface_id=None):
|
|
"""Keyword arguments:
|
|
reserved -- should be a boolean value(True or False), exact value will be
|
|
used to filter on the fixed IP address
|
|
"""
|
|
if not uuidutils.is_uuid_like(instance_uuid):
|
|
raise exception.InvalidUUID(uuid=instance_uuid)
|
|
|
|
network_or_none = or_(models.FixedIp.network_id == network_id,
|
|
models.FixedIp.network_id == null())
|
|
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter(network_or_none).\
|
|
filter_by(reserved=reserved).\
|
|
filter_by(address=address).\
|
|
first()
|
|
|
|
if fixed_ip_ref is None:
|
|
raise exception.FixedIpNotFoundForNetwork(address=address,
|
|
network_uuid=network_id)
|
|
if fixed_ip_ref.instance_uuid:
|
|
raise exception.FixedIpAlreadyInUse(address=address,
|
|
instance_uuid=instance_uuid)
|
|
|
|
params = {'instance_uuid': instance_uuid,
|
|
'allocated': virtual_interface_id is not None}
|
|
if not fixed_ip_ref.network_id:
|
|
params['network_id'] = network_id
|
|
if virtual_interface_id:
|
|
params['virtual_interface_id'] = virtual_interface_id
|
|
|
|
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter_by(id=fixed_ip_ref.id).\
|
|
filter(network_or_none).\
|
|
filter_by(reserved=reserved).\
|
|
filter_by(address=address).\
|
|
update(params, synchronize_session='evaluate')
|
|
|
|
if not rows_updated:
|
|
LOG.debug('The row was updated in a concurrent transaction, '
|
|
'we will fetch another row')
|
|
raise db_exc.RetryRequest(
|
|
exception.FixedIpAssociateFailed(net=network_id))
|
|
|
|
return fixed_ip_ref
|
|
|
|
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def fixed_ip_associate_pool(context, network_id, instance_uuid=None,
|
|
host=None, virtual_interface_id=None):
|
|
"""allocate a fixed ip out of a fixed ip network pool.
|
|
|
|
This allocates an unallocated fixed ip out of a specified
|
|
network. We sort by updated_at to hand out the oldest address in
|
|
the list.
|
|
|
|
"""
|
|
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
|
|
raise exception.InvalidUUID(uuid=instance_uuid)
|
|
|
|
network_or_none = or_(models.FixedIp.network_id == network_id,
|
|
models.FixedIp.network_id == null())
|
|
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter(network_or_none).\
|
|
filter_by(reserved=False).\
|
|
filter_by(instance_uuid=None).\
|
|
filter_by(host=None).\
|
|
filter_by(leased=False).\
|
|
order_by(asc(models.FixedIp.updated_at)).\
|
|
first()
|
|
|
|
if not fixed_ip_ref:
|
|
raise exception.NoMoreFixedIps(net=network_id)
|
|
|
|
params = {'allocated': virtual_interface_id is not None}
|
|
if fixed_ip_ref['network_id'] is None:
|
|
params['network_id'] = network_id
|
|
if instance_uuid:
|
|
params['instance_uuid'] = instance_uuid
|
|
if host:
|
|
params['host'] = host
|
|
if virtual_interface_id:
|
|
params['virtual_interface_id'] = virtual_interface_id
|
|
|
|
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter_by(id=fixed_ip_ref['id']).\
|
|
filter_by(network_id=fixed_ip_ref['network_id']).\
|
|
filter_by(reserved=False).\
|
|
filter_by(instance_uuid=None).\
|
|
filter_by(host=None).\
|
|
filter_by(leased=False).\
|
|
filter_by(address=fixed_ip_ref['address']).\
|
|
update(params, synchronize_session='evaluate')
|
|
|
|
if not rows_updated:
|
|
LOG.debug('The row was updated in a concurrent transaction, '
|
|
'we will fetch another row')
|
|
raise db_exc.RetryRequest(
|
|
exception.FixedIpAssociateFailed(net=network_id))
|
|
|
|
return fixed_ip_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def fixed_ip_create(context, values):
|
|
fixed_ip_ref = models.FixedIp()
|
|
fixed_ip_ref.update(values)
|
|
try:
|
|
fixed_ip_ref.save(context.session)
|
|
except db_exc.DBDuplicateEntry:
|
|
raise exception.FixedIpExists(address=values['address'])
|
|
return fixed_ip_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def fixed_ip_bulk_create(context, ips):
|
|
try:
|
|
tab = models.FixedIp.__table__
|
|
context.session.execute(tab.insert(), ips)
|
|
except db_exc.DBDuplicateEntry as e:
|
|
raise exception.FixedIpExists(address=e.value)
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def fixed_ip_disassociate(context, address):
|
|
_fixed_ip_get_by_address(context, address).update(
|
|
{'instance_uuid': None,
|
|
'virtual_interface_id': None})
|
|
|
|
|
|
@pick_context_manager_writer
|
|
def fixed_ip_disassociate_all_by_timeout(context, host, time):
|
|
# NOTE(vish): only update fixed ips that "belong" to this
|
|
# host; i.e. the network host or the instance
|
|
# host matches. Two queries necessary because
|
|
# join with update doesn't work.
|
|
host_filter = or_(and_(models.Instance.host == host,
|
|
models.Network.multi_host == true()),
|
|
models.Network.host == host)
|
|
result = model_query(context, models.FixedIp, (models.FixedIp.id,),
|
|
read_deleted="no").\
|
|
filter(models.FixedIp.allocated == false()).\
|
|
filter(models.FixedIp.updated_at < time).\
|
|
join((models.Network,
|
|
models.Network.id == models.FixedIp.network_id)).\
|
|
join((models.Instance,
|
|
models.Instance.uuid == models.FixedIp.instance_uuid)).\
|
|
filter(host_filter).\
|
|
all()
|
|
fixed_ip_ids = [fip[0] for fip in result]
|
|
if not fixed_ip_ids:
|
|
return 0
|
|
result = model_query(context, models.FixedIp).\
|
|
filter(models.FixedIp.id.in_(fixed_ip_ids)).\
|
|
update({'instance_uuid': None,
|
|
'leased': False,
|
|
'updated_at': timeutils.utcnow()},
|
|
synchronize_session='fetch')
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get(context, id, get_network=False):
|
|
query = model_query(context, models.FixedIp).filter_by(id=id)
|
|
if get_network:
|
|
query = query.options(joinedload('network'))
|
|
result = query.first()
|
|
if not result:
|
|
raise exception.FixedIpNotFound(id=id)
|
|
|
|
# FIXME(sirp): shouldn't we just use project_only here to restrict the
|
|
# results?
|
|
if (nova.context.is_user_context(context) and
|
|
result['instance_uuid'] is not None):
|
|
instance = instance_get_by_uuid(context.elevated(read_deleted='yes'),
|
|
result['instance_uuid'])
|
|
nova.context.authorize_project_context(context, instance.project_id)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_all(context):
|
|
result = model_query(context, models.FixedIp, read_deleted="yes").all()
|
|
if not result:
|
|
raise exception.NoFixedIpsDefined()
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_by_address(context, address, columns_to_join=None):
|
|
return _fixed_ip_get_by_address(context, address,
|
|
columns_to_join=columns_to_join)
|
|
|
|
|
|
def _fixed_ip_get_by_address(context, address, columns_to_join=None):
|
|
if columns_to_join is None:
|
|
columns_to_join = []
|
|
|
|
try:
|
|
result = model_query(context, models.FixedIp)
|
|
for column in columns_to_join:
|
|
result = result.options(joinedload_all(column))
|
|
result = result.filter_by(address=address).first()
|
|
if not result:
|
|
raise exception.FixedIpNotFoundForAddress(address=address)
|
|
except db_exc.DBError:
|
|
msg = _("Invalid fixed IP Address %s in request") % address
|
|
LOG.warning(msg)
|
|
raise exception.FixedIpInvalid(msg)
|
|
|
|
# NOTE(sirp): shouldn't we just use project_only here to restrict the
|
|
# results?
|
|
if (nova.context.is_user_context(context) and
|
|
result['instance_uuid'] is not None):
|
|
instance = _instance_get_by_uuid(
|
|
context.elevated(read_deleted='yes'),
|
|
result['instance_uuid'])
|
|
nova.context.authorize_project_context(context,
|
|
instance.project_id)
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_by_floating_address(context, floating_address):
|
|
return model_query(context, models.FixedIp).\
|
|
join(models.FloatingIp,
|
|
models.FloatingIp.fixed_ip_id ==
|
|
models.FixedIp.id).\
|
|
filter(models.FloatingIp.address == floating_address).\
|
|
first()
|
|
# NOTE(tr3buchet) please don't invent an exception here, None is fine
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_by_instance(context, instance_uuid):
|
|
if not uuidutils.is_uuid_like(instance_uuid):
|
|
raise exception.InvalidUUID(uuid=instance_uuid)
|
|
|
|
vif_and = and_(models.VirtualInterface.id ==
|
|
models.FixedIp.virtual_interface_id,
|
|
models.VirtualInterface.deleted == 0)
|
|
result = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
outerjoin(models.VirtualInterface, vif_and).\
|
|
options(contains_eager("virtual_interface")).\
|
|
options(joinedload('network')).\
|
|
options(joinedload('floating_ips')).\
|
|
order_by(asc(models.VirtualInterface.created_at),
|
|
asc(models.VirtualInterface.id)).\
|
|
all()
|
|
|
|
if not result:
|
|
raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)
|
|
|
|
return result
|
|
|
|
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_by_host(context, host):
|
|
instance_uuids = _instance_get_all_uuids_by_host(context, host)
|
|
if not instance_uuids:
|
|
return []
|
|
|
|
return model_query(context, models.FixedIp).\
|
|
filter(models.FixedIp.instance_uuid.in_(instance_uuids)).\
|
|
all()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ip_get_by_network_host(context, network_id, host):
|
|
result = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter_by(network_id=network_id).\
|
|
filter_by(host=host).\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.FixedIpNotFoundForNetworkHost(network_id=network_id,
|
|
host=host)
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def fixed_ips_by_virtual_interface(context, vif_id):
|
|
result = model_query(context, models.FixedIp, read_deleted="no").\
|
|
filter_by(virtual_interface_id=vif_id).\
|
|
options(joinedload('network')).\
|
|
options(joinedload('floating_ips')).\
|
|
all()
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def fixed_ip_update(context, address, values):
|
|
_fixed_ip_get_by_address(context, address).update(values)
|
|
|
|
|
|
def _fixed_ip_count_by_project(context, project_id):
|
|
nova.context.authorize_project_context(context, project_id)
|
|
return model_query(context, models.FixedIp, (models.FixedIp.id,),
|
|
read_deleted="no").\
|
|
join((models.Instance,
|
|
models.Instance.uuid == models.FixedIp.instance_uuid)).\
|
|
filter(models.Instance.project_id == project_id).\
|
|
count()
|
|
|
|
|
|
###################
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def virtual_interface_create(context, values):
|
|
"""Create a new virtual interface record in the database.
|
|
|
|
:param values: = dict containing column values
|
|
"""
|
|
try:
|
|
vif_ref = models.VirtualInterface()
|
|
vif_ref.update(values)
|
|
vif_ref.save(context.session)
|
|
except db_exc.DBError:
|
|
LOG.exception("VIF creation failed with a database error.")
|
|
raise exception.VirtualInterfaceCreateException()
|
|
|
|
return vif_ref
|
|
|
|
|
|
def _virtual_interface_query(context):
|
|
return model_query(context, models.VirtualInterface, read_deleted="no")
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def virtual_interface_update(context, address, values):
|
|
vif_ref = virtual_interface_get_by_address(context, address)
|
|
vif_ref.update(values)
|
|
vif_ref.save(context.session)
|
|
return vif_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def virtual_interface_get(context, vif_id):
|
|
"""Gets a virtual interface from the table.
|
|
|
|
:param vif_id: = id of the virtual interface
|
|
"""
|
|
vif_ref = _virtual_interface_query(context).\
|
|
filter_by(id=vif_id).\
|
|
first()
|
|
return vif_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def virtual_interface_get_by_address(context, address):
|
|
"""Gets a virtual interface from the table.
|
|
|
|
:param address: = the address of the interface you're looking to get
|
|
"""
|
|
try:
|
|
vif_ref = _virtual_interface_query(context).\
|
|
filter_by(address=address).\
|
|
first()
|
|
except db_exc.DBError:
|
|
msg = _("Invalid virtual interface address %s in request") % address
|
|
LOG.warning(msg)
|
|
raise exception.InvalidIpAddressError(msg)
|
|
return vif_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def virtual_interface_get_by_uuid(context, vif_uuid):
|
|
"""Gets a virtual interface from the table.
|
|
|
|
:param vif_uuid: the uuid of the interface you're looking to get
|
|
"""
|
|
vif_ref = _virtual_interface_query(context).\
|
|
filter_by(uuid=vif_uuid).\
|
|
first()
|
|
return vif_ref
|
|
|
|
|
|
@require_context
|
|
@require_instance_exists_using_uuid
|
|
@pick_context_manager_reader_allow_async
|
|
def virtual_interface_get_by_instance(context, instance_uuid):
|
|
"""Gets all virtual interfaces for instance.
|
|
|
|
:param instance_uuid: = uuid of the instance to retrieve vifs for
|
|
"""
|
|
vif_refs = _virtual_interface_query(context).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
order_by(asc("created_at"), asc("id")).\
|
|
all()
|
|
return vif_refs
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def virtual_interface_get_by_instance_and_network(context, instance_uuid,
|
|
network_id):
|
|
"""Gets virtual interface for instance that's associated with network."""
|
|
vif_ref = _virtual_interface_query(context).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
filter_by(network_id=network_id).\
|
|
first()
|
|
return vif_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def virtual_interface_delete_by_instance(context, instance_uuid):
|
|
"""Delete virtual interface records that are associated
|
|
with the instance given by instance_id.
|
|
|
|
:param instance_uuid: = uuid of instance
|
|
"""
|
|
_virtual_interface_query(context).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_writer
|
|
def virtual_interface_delete(context, id):
|
|
"""Delete virtual interface records.
|
|
|
|
:param id: id of the interface
|
|
"""
|
|
_virtual_interface_query(context).\
|
|
filter_by(id=id).\
|
|
soft_delete()
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def virtual_interface_get_all(context):
|
|
"""Get all vifs."""
|
|
vif_refs = _virtual_interface_query(context).all()
|
|
return vif_refs
|
|
|
|
|
|
###################
|
|
|
|
|
|
def _metadata_refs(metadata_dict, meta_class):
|
|
metadata_refs = []
|
|
if metadata_dict:
|
|
for k, v in metadata_dict.items():
|
|
metadata_ref = meta_class()
|
|
metadata_ref['key'] = k
|
|
metadata_ref['value'] = v
|
|
metadata_refs.append(metadata_ref)
|
|
return metadata_refs
|
|
|
|
|
|
def _validate_unique_server_name(context, name):
|
|
if not CONF.osapi_compute_unique_server_name_scope:
|
|
return
|
|
|
|
lowername = name.lower()
|
|
base_query = model_query(context, models.Instance, read_deleted='no').\
|
|
filter(func.lower(models.Instance.hostname) == lowername)
|
|
|
|
if CONF.osapi_compute_unique_server_name_scope == 'project':
|
|
instance_with_same_name = base_query.\
|
|
filter_by(project_id=context.project_id).\
|
|
count()
|
|
|
|
elif CONF.osapi_compute_unique_server_name_scope == 'global':
|
|
instance_with_same_name = base_query.count()
|
|
|
|
else:
|
|
return
|
|
|
|
if instance_with_same_name > 0:
|
|
raise exception.InstanceExists(name=lowername)
|
|
|
|
|
|
def _handle_objects_related_type_conversions(values):
|
|
"""Make sure that certain things in values (which may have come from
|
|
an objects.instance.Instance object) are in suitable form for the
|
|
database.
|
|
"""
|
|
# NOTE(danms): Make sure IP addresses are passed as strings to
|
|
# the database engine
|
|
for key in ('access_ip_v4', 'access_ip_v6'):
|
|
if key in values and values[key] is not None:
|
|
values[key] = str(values[key])
|
|
|
|
datetime_keys = ('created_at', 'deleted_at', 'updated_at',
|
|
'launched_at', 'terminated_at')
|
|
convert_objects_related_datetimes(values, *datetime_keys)
|
|
|
|
|
|
def _check_instance_exists_in_project(context, instance_uuid):
|
|
if not model_query(context, models.Instance, read_deleted="no",
|
|
project_only=True).filter_by(
|
|
uuid=instance_uuid).first():
|
|
raise exception.InstanceNotFound(instance_id=instance_uuid)
|
|
|
|
|
|
@require_context
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def instance_create(context, values):
|
|
"""Create a new Instance record in the database.
|
|
|
|
context - request context object
|
|
values - dict containing column values.
|
|
"""
|
|
|
|
security_group_ensure_default(context)
|
|
|
|
values = values.copy()
|
|
values['metadata'] = _metadata_refs(
|
|
values.get('metadata'), models.InstanceMetadata)
|
|
|
|
values['system_metadata'] = _metadata_refs(
|
|
values.get('system_metadata'), models.InstanceSystemMetadata)
|
|
_handle_objects_related_type_conversions(values)
|
|
|
|
instance_ref = models.Instance()
|
|
if not values.get('uuid'):
|
|
values['uuid'] = uuidutils.generate_uuid()
|
|
instance_ref['info_cache'] = models.InstanceInfoCache()
|
|
info_cache = values.pop('info_cache', None)
|
|
if info_cache is not None:
|
|
instance_ref['info_cache'].update(info_cache)
|
|
security_groups = values.pop('security_groups', [])
|
|
instance_ref['extra'] = models.InstanceExtra()
|
|
instance_ref['extra'].update(
|
|
{'numa_topology': None,
|
|
'pci_requests': None,
|
|
'vcpu_model': None,
|
|
})
|
|
instance_ref['extra'].update(values.pop('extra', {}))
|
|
instance_ref.update(values)
|
|
|
|
def _get_sec_group_models(security_groups):
|
|
models = []
|
|
default_group = _security_group_ensure_default(context)
|
|
if 'default' in security_groups:
|
|
models.append(default_group)
|
|
# Generate a new list, so we don't modify the original
|
|
security_groups = [x for x in security_groups if x != 'default']
|
|
if security_groups:
|
|
models.extend(_security_group_get_by_names(
|
|
context, security_groups))
|
|
return models
|
|
|
|
if 'hostname' in values:
|
|
_validate_unique_server_name(context, values['hostname'])
|
|
instance_ref.security_groups = _get_sec_group_models(security_groups)
|
|
context.session.add(instance_ref)
|
|
|
|
# create the instance uuid to ec2_id mapping entry for instance
|
|
ec2_instance_create(context, instance_ref['uuid'])
|
|
|
|
# Parity with the return value of instance_get_all_by_filters_sort()
|
|
# Obviously a newly-created instance record can't already have a fault
|
|
# record because of the FK constraint, so this is fine.
|
|
instance_ref.fault = None
|
|
|
|
return instance_ref
|
|
|
|
|
|
def _instance_data_get_for_user(context, project_id, user_id):
|
|
not_soft_deleted = or_(
|
|
models.Instance.vm_state != vm_states.SOFT_DELETED,
|
|
models.Instance.vm_state == null()
|
|
)
|
|
result = model_query(context, models.Instance, (
|
|
func.count(models.Instance.id),
|
|
func.sum(models.Instance.vcpus),
|
|
func.sum(models.Instance.memory_mb))).\
|
|
filter_by(project_id=project_id).filter(not_soft_deleted)
|
|
if user_id:
|
|
result = result.filter_by(user_id=user_id).first()
|
|
else:
|
|
result = result.first()
|
|
# NOTE(vish): convert None to 0
|
|
return (result[0] or 0, result[1] or 0, result[2] or 0)
|
|
|
|
|
|
@require_context
|
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
@pick_context_manager_writer
|
|
def instance_destroy(context, instance_uuid, constraint=None):
|
|
if uuidutils.is_uuid_like(instance_uuid):
|
|
instance_ref = _instance_get_by_uuid(context, instance_uuid)
|
|
else:
|
|
raise exception.InvalidUUID(instance_uuid)
|
|
|
|
query = model_query(context, models.Instance).\
|
|
filter_by(uuid=instance_uuid)
|
|
if constraint is not None:
|
|
query = constraint.apply(models.Instance, query)
|
|
count = query.soft_delete()
|
|
if count == 0:
|
|
raise exception.ConstraintNotMet()
|
|
model_query(context, models.SecurityGroupInstanceAssociation).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceInfoCache).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceMetadata).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceFault).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceExtra).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceSystemMetadata).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceGroupMember).\
|
|
filter_by(instance_id=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.BlockDeviceMapping).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.Migration).\
|
|
filter_by(instance_uuid=instance_uuid).\
|
|
soft_delete()
|
|
model_query(context, models.InstanceIdMapping).filter_by(
|
|
uuid=instance_uuid).soft_delete()
|
|
# NOTE(snikitin): We can't use model_query here, because there is no
|
|
# column 'deleted' in 'tags' or 'console_auth_tokens' tables.
|
|
context.session.query(models.Tag).filter_by(
|
|
resource_id=instance_uuid).delete()
|
|
context.session.query(models.ConsoleAuthToken).filter_by(
|
|
instance_uuid=instance_uuid).delete()
|
|
# NOTE(cfriesen): We intentionally do not soft-delete entries in the
|
|
# instance_actions or instance_actions_events tables because they
|
|
# can be used by operators to find out what actions were performed on a
|
|
# deleted instance. Both of these tables are special-cased in
|
|
# _archive_deleted_rows_for_table().
|
|
|
|
return instance_ref
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_by_uuid(context, uuid, columns_to_join=None):
|
|
return _instance_get_by_uuid(context, uuid,
|
|
columns_to_join=columns_to_join)
|
|
|
|
|
|
def _instance_get_by_uuid(context, uuid, columns_to_join=None):
|
|
result = _build_instance_get(context, columns_to_join=columns_to_join).\
|
|
filter_by(uuid=uuid).\
|
|
first()
|
|
|
|
if not result:
|
|
raise exception.InstanceNotFound(instance_id=uuid)
|
|
|
|
return result
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def instance_get(context, instance_id, columns_to_join=None):
|
|
try:
|
|
result = _build_instance_get(context, columns_to_join=columns_to_join
|
|
).filter_by(id=instance_id).first()
|
|
|
|
if not result:
|
|
raise exception.InstanceNotFound(instance_id=instance_id)
|
|
|
|
return result
|
|
except db_exc.DBError:
|
|
# NOTE(sdague): catch all in case the db engine chokes on the
|
|
# id because it's too long of an int to store.
|
|
LOG.warning("Invalid instance id %s in request", instance_id)
|
|
raise exception.InvalidID(id=instance_id)
|
|
|
|
|
|
def _build_instance_get(context, columns_to_join=None):
|
|
query = model_query(context, models.Instance, project_only=True).\
|
|
options(joinedload_all('security_groups.rules')).\
|
|
options(joinedload('info_cache'))
|
|
if columns_to_join is None:
|
|
columns_to_join = ['metadata', 'system_metadata']
|
|
for column in columns_to_join:
|
|
if column in ['info_cache', 'security_groups']:
|
|
# Already always joined above
|
|
continue
|
|
if 'extra.' in column:
|
|
query = query.options(undefer(column))
|
|
else:
|
|
query = query.options(joinedload(column))
|
|
# NOTE(alaski) Stop lazy loading of columns not needed.
|
|
for col in ['metadata', 'system_metadata']:
|
|
if col not in columns_to_join:
|
|
query = query.options(noload(col))
|
|
return query
|
|
|
|
|
|
def _instances_fill_metadata(context, instances, manual_joins=None):
|
|
"""Selectively fill instances with manually-joined metadata. Note that
|
|
instance will be converted to a dict.
|
|
|
|
:param context: security context
|
|
:param instances: list of instances to fill
|
|
:param manual_joins: list of tables to manually join (can be any
|
|
combination of 'metadata' and 'system_metadata' or
|
|
None to take the default of both)
|
|
"""
|
|
uuids = [inst['uuid'] for inst in instances]
|
|
|
|
if manual_joins is None:
|
|
manual_joins = ['metadata', 'system_metadata']
|
|
|
|
meta = collections.defaultdict(list)
|
|
if 'metadata' in manual_joins:
|
|
for row in _instance_metadata_get_multi(context, uuids):
|
|
meta[row['instance_uuid']].append(row)
|
|
|
|
sys_meta = collections.defaultdict(list)
|
|
if 'system_metadata' in manual_joins:
|
|
for row in _instance_system_metadata_get_multi(context, uuids):
|
|
sys_meta[row['instance_uuid']].append(row)
|
|
|
|
pcidevs = collections.defaultdict(list)
|
|
if 'pci_devices' in manual_joins:
|
|
for row in _instance_pcidevs_get_multi(context, uuids):
|
|
pcidevs[row['instance_uuid']].append(row)
|
|
|
|
if 'fault' in manual_joins:
|
|
faults = instance_fault_get_by_instance_uuids(context, uuids,
|
|
latest=True)
|
|
else:
|
|
faults = {}
|
|
|
|
filled_instances = []
|
|
for inst in instances:
|
|
inst = dict(inst)
|
|
inst['system_metadata'] = sys_meta[inst['uuid']]
|
|
inst['metadata'] = meta[inst['uuid']]
|
|
if 'pci_devices' in manual_joins:
|
|
inst['pci_devices'] = pcidevs[inst['uuid']]
|
|
inst_faults = faults.get(inst['uuid'])
|
|
inst['fault'] = inst_faults and inst_faults[0] or None
|
|
filled_instances.append(inst)
|
|
|
|
return filled_instances
|
|
|
|
|
|
def _manual_join_columns(columns_to_join):
|
|
"""Separate manually joined columns from columns_to_join
|
|
|
|
If columns_to_join contains 'metadata', 'system_metadata', 'fault', or
|
|
'pci_devices' those columns are removed from columns_to_join and added
|
|
to a manual_joins list to be used with the _instances_fill_metadata method.
|
|
|
|
The columns_to_join formal parameter is copied and not modified, the return
|
|
tuple has the modified columns_to_join list to be used with joinedload in
|
|
a model query.
|
|
|
|
:param:columns_to_join: List of columns to join in a model query.
|
|
:return: tuple of (manual_joins, columns_to_join)
|
|
"""
|
|
manual_joins = []
|
|
columns_to_join_new = copy.copy(columns_to_join)
|
|
for column in ('metadata', 'system_metadata', 'pci_devices', 'fault'):
|
|
if column in columns_to_join_new:
|
|
columns_to_join_new.remove(column)
|
|
manual_joins.append(column)
|
|
return manual_joins, columns_to_join_new
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader
|
|
def instance_get_all(context, columns_to_join=None):
|
|
if columns_to_join is None:
|
|
columns_to_join_new = ['info_cache', 'security_groups']
|
|
manual_joins = ['metadata', 'system_metadata']
|
|
else:
|
|
manual_joins, columns_to_join_new = (
|
|
_manual_join_columns(columns_to_join))
|
|
query = model_query(context, models.Instance)
|
|
for column in columns_to_join_new:
|
|
query = query.options(joinedload(column))
|
|
if not context.is_admin:
|
|
# If we're not admin context, add appropriate filter..
|
|
if context.project_id:
|
|
query = query.filter_by(project_id=context.project_id)
|
|
else:
|
|
query = query.filter_by(user_id=context.user_id)
|
|
instances = query.all()
|
|
return _instances_fill_metadata(context, instances, manual_joins)
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_all_by_filters(context, filters, sort_key, sort_dir,
|
|
limit=None, marker=None, columns_to_join=None):
|
|
"""Return instances matching all filters sorted by the primary key.
|
|
|
|
See instance_get_all_by_filters_sort for more information.
|
|
"""
|
|
# Invoke the API with the multiple sort keys and directions using the
|
|
# single sort key/direction
|
|
return instance_get_all_by_filters_sort(context, filters, limit=limit,
|
|
marker=marker,
|
|
columns_to_join=columns_to_join,
|
|
sort_keys=[sort_key],
|
|
sort_dirs=[sort_dir])
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
|
|
columns_to_join=None, sort_keys=None,
|
|
sort_dirs=None):
|
|
"""Return instances that match all filters sorted by the given keys.
|
|
Deleted instances will be returned by default, unless there's a filter that
|
|
says otherwise.
|
|
|
|
Depending on the name of a filter, matching for that filter is
|
|
performed using either exact matching or as regular expression
|
|
matching. Exact matching is applied for the following filters::
|
|
|
|
| ['project_id', 'user_id', 'image_ref',
|
|
| 'vm_state', 'instance_type_id', 'uuid',
|
|
| 'metadata', 'host', 'system_metadata']
|
|
|
|
|
|
A third type of filter (also using exact matching), filters
|
|
based on instance metadata tags when supplied under a special
|
|
key named 'filter'::
|
|
|
|
| filters = {
|
|
| 'filter': [
|
|
| {'name': 'tag-key', 'value': '<metakey>'},
|
|
| {'name': 'tag-value', 'value': '<metaval>'},
|
|
| {'name': 'tag:<metakey>', 'value': '<metaval>'}
|
|
| ]
|
|
| }
|
|
|
|
Special keys are used to tweek the query further::
|
|
|
|
| 'changes-since' - only return instances updated after
|
|
| 'deleted' - only return (or exclude) deleted instances
|
|
| 'soft_deleted' - modify behavior of 'deleted' to either
|
|
| include or exclude instances whose
|
|
| vm_state is SOFT_DELETED.
|
|
|
|
A fourth type of filter (also using exact matching), filters
|
|
based on instance tags (not metadata tags). There are two types
|
|
of these tags:
|
|
|
|
`tags` -- One or more strings that will be used to filter results
|
|
in an AND expression: T1 AND T2
|
|
|
|
`tags-any` -- One or more strings that will be used to filter results in
|
|
an OR expression: T1 OR T2
|
|
|
|
`not-tags` -- One or more strings that will be used to filter results in
|
|
an NOT AND expression: NOT (T1 AND T2)
|
|
|
|
`not-tags-any` -- One or more strings that will be used to filter results
|
|
in an NOT OR expression: NOT (T1 OR T2)
|
|
|
|
Tags should be represented as list::
|
|
|
|
| filters = {
|
|
| 'tags': [some-tag, some-another-tag],
|
|
| 'tags-any: [some-any-tag, some-another-any-tag],
|
|
| 'not-tags: [some-not-tag, some-another-not-tag],
|
|
| 'not-tags-any: [some-not-any-tag, some-another-not-any-tag]
|
|
| }
|
|
|
|
"""
|
|
# NOTE(mriedem): If the limit is 0 there is no point in even going
|
|
# to the database since nothing is going to be returned anyway.
|
|
if limit == 0:
|
|
return []
|
|
|
|
sort_keys, sort_dirs = process_sort_params(sort_keys,
|
|
sort_dirs,
|
|
default_dir='desc')
|
|
|
|
if columns_to_join is None:
|
|
columns_to_join_new = ['info_cache', 'security_groups']
|
|
manual_joins = ['metadata', 'system_metadata']
|
|
else:
|
|
manual_joins, columns_to_join_new = (
|
|
_manual_join_columns(columns_to_join))
|
|
|
|
query_prefix = context.session.query(models.Instance)
|
|
for column in columns_to_join_new:
|
|
if 'extra.' in column:
|
|
query_prefix = query_prefix.options(undefer(column))
|
|
else:
|
|
query_prefix = query_prefix.options(joinedload(column))
|
|
|
|
# Note: order_by is done in the sqlalchemy.utils.py paginate_query(),
|
|
# no need to do it here as well
|
|
|
|
# Make a copy of the filters dictionary to use going forward, as we'll
|
|
# be modifying it and we shouldn't affect the caller's use of it.
|
|
filters = copy.deepcopy(filters)
|
|
|
|
if 'changes-since' in filters:
|
|
changes_since = timeutils.normalize_time(filters['changes-since'])
|
|
query_prefix = query_prefix.\
|
|
filter(models.Instance.updated_at >= changes_since)
|
|
|
|
if 'deleted' in filters:
|
|
# Instances can be soft or hard deleted and the query needs to
|
|
# include or exclude both
|
|
deleted = filters.pop('deleted')
|
|
if deleted:
|
|
if filters.pop('soft_deleted', True):
|
|
delete = or_(
|
|
models.Instance.deleted == models.Instance.id,
|
|
models.Instance.vm_state == vm_states.SOFT_DELETED
|
|
)
|
|
query_prefix = query_prefix.\
|
|
filter(delete)
|
|
else:
|
|
query_prefix = query_prefix.\
|
|
filter(models.Instance.deleted == models.Instance.id)
|
|
else:
|
|
query_prefix = query_prefix.\
|
|
filter_by(deleted=0)
|
|
if not filters.pop('soft_deleted', False):
|
|
# It would be better to have vm_state not be nullable
|
|
# but until then we test it explicitly as a workaround.
|
|
not_soft_deleted = or_(
|
|
models.Instance.vm_state != vm_states.SOFT_DELETED,
|
|
models.Instance.vm_state == null()
|
|
)
|
|
query_prefix = query_prefix.filter(not_soft_deleted)
|
|
|
|
if 'cleaned' in filters:
|
|
cleaned = 1 if filters.pop('cleaned') else 0
|
|
query_prefix = query_prefix.filter(models.Instance.cleaned == cleaned)
|
|
|
|
if 'tags' in filters:
|
|
tags = filters.pop('tags')
|
|
# We build a JOIN ladder expression for each tag, JOIN'ing
|
|
# the first tag to the instances table, and each subsequent
|
|
# tag to the last JOIN'd tags table
|
|
first_tag = tags.pop(0)
|
|
query_prefix = query_prefix.join(models.Instance.tags)
|
|
query_prefix = query_prefix.filter(models.Tag.tag == first_tag)
|
|
|
|
for tag in tags:
|
|
tag_alias = aliased(models.Tag)
|
|
query_prefix = query_prefix.join(tag_alias,
|
|
models.Instance.tags)
|
|
query_prefix = query_prefix.filter(tag_alias.tag == tag)
|
|
|
|
if 'tags-any' in filters:
|
|
tags = filters.pop('tags-any')
|
|
tag_alias = aliased(models.Tag)
|
|
query_prefix = query_prefix.join(tag_alias, models.Instance.tags)
|
|
query_prefix = query_prefix.filter(tag_alias.tag.in_(tags))
|
|
|
|
if 'not-tags' in filters:
|
|
tags = filters.pop('not-tags')
|
|
first_tag = tags.pop(0)
|
|
subq = query_prefix.session.query(models.Tag.resource_id)
|
|
subq = subq.join(models.Instance.tags)
|
|
subq = subq.filter(models.Tag.tag == first_tag)
|
|
|
|
for tag in tags:
|
|
tag_alias = aliased(models.Tag)
|
|
subq = subq.join(tag_alias, models.Instance.tags)
|
|
subq = subq.filter(tag_alias.tag == tag)
|
|
|
|
query_prefix = query_prefix.filter(~models.Instance.uuid.in_(subq))
|
|
|
|
if 'not-tags-any' in filters:
|
|
tags = filters.pop('not-tags-any')
|
|
query_prefix = query_prefix.filter(~models.Instance.tags.any(
|
|
models.Tag.tag.in_(tags)))
|
|
|
|
if not context.is_admin:
|
|
# If we're not admin context, add appropriate filter..
|
|
if context.project_id:
|
|
filters['project_id'] = context.project_id
|
|
else:
|
|
filters['user_id'] = context.user_id
|
|
|
|
# Filters for exact matches that we can do along with the SQL query...
|
|
# For other filters that don't match this, we will do regexp matching
|
|
exact_match_filter_names = ['project_id', 'user_id', 'image_ref',
|
|
'vm_state', 'instance_type_id', 'uuid',
|
|
'metadata', 'host', 'task_state',
|
|
'system_metadata']
|
|
|
|
# Filter the query
|
|
query_prefix = _exact_instance_filter(query_prefix,
|
|
filters, exact_match_filter_names)
|
|
if query_prefix is None:
|
|
return []
|
|
query_prefix = _regex_instance_filter(query_prefix, filters)
|
|
|
|
# paginate query
|
|
if marker is not None:
|
|
try:
|
|
marker = _instance_get_by_uuid(
|
|
context.elevated(read_deleted='yes'), marker)
|
|
except exception.InstanceNotFound:
|
|
raise exception.MarkerNotFound(marker=marker)
|
|
try:
|
|
query_prefix = sqlalchemyutils.paginate_query(query_prefix,
|
|
models.Instance, limit,
|
|
sort_keys,
|
|
marker=marker,
|
|
sort_dirs=sort_dirs)
|
|
except db_exc.InvalidSortKey:
|
|
raise exception.InvalidSortKey()
|
|
|
|
return _instances_fill_metadata(context, query_prefix.all(), manual_joins)
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_by_sort_filters(context, sort_keys, sort_dirs, values):
|
|
"""Attempt to get a single instance based on a combination of sort
|
|
keys, directions and filter values. This is used to try to find a
|
|
marker instance when we don't have a marker uuid.
|
|
|
|
This returns just a uuid of the instance that matched.
|
|
"""
|
|
|
|
model = models.Instance
|
|
return _model_get_uuid_by_sort_filters(context, model, sort_keys,
|
|
sort_dirs, values)
|
|
|
|
|
|
def _model_get_uuid_by_sort_filters(context, model, sort_keys, sort_dirs,
|
|
values):
|
|
query = context.session.query(model.uuid)
|
|
|
|
# NOTE(danms): Below is a re-implementation of our
|
|
# oslo_db.sqlalchemy.utils.paginate_query() utility. We can't use that
|
|
# directly because it does not return the marker and we need it to.
|
|
# The below is basically the same algorithm, stripped down to just what
|
|
# we need, and augmented with the filter criteria required for us to
|
|
# get back the instance that would correspond to our query.
|
|
|
|
# This is our position in sort_keys,sort_dirs,values for the loop below
|
|
key_index = 0
|
|
|
|
# We build a list of criteria to apply to the query, which looks
|
|
# approximately like this (assuming all ascending):
|
|
#
|
|
# OR(row.key1 > val1,
|
|
# AND(row.key1 == val1, row.key2 > val2),
|
|
# AND(row.key1 == val1, row.key2 == val2, row.key3 >= val3),
|
|
# )
|
|
#
|
|
# The final key is compared with the "or equal" variant so that
|
|
# a complete match instance is still returned.
|
|
criteria = []
|
|
|
|
for skey, sdir, val in zip(sort_keys, sort_dirs, values):
|
|
# Apply ordering to our query for the key, direction we're processing
|
|
if sdir == 'desc':
|
|
query = query.order_by(desc(getattr(model, skey)))
|
|
else:
|
|
query = query.order_by(asc(getattr(model, skey)))
|
|
|
|
# Build a list of equivalence requirements on keys we've already
|
|
# processed through the loop. In other words, if we're adding
|
|
# key2 > val2, make sure that key1 == val1
|
|
crit_attrs = []
|
|
for equal_attr in range(0, key_index):
|
|
crit_attrs.append(
|
|
(getattr(model, sort_keys[equal_attr]) == values[equal_attr]))
|
|
|
|
model_attr = getattr(model, skey)
|
|
if isinstance(model_attr.type, Boolean):
|
|
model_attr = cast(model_attr, Integer)
|
|
val = int(val)
|
|
|
|
if skey == sort_keys[-1]:
|
|
# If we are the last key, then we should use or-equal to
|
|
# allow a complete match to be returned
|
|
if sdir == 'asc':
|
|
crit = (model_attr >= val)
|
|
else:
|
|
crit = (model_attr <= val)
|
|
else:
|
|
# If we're not the last key, then strict greater or less than
|
|
# so we order strictly.
|
|
if sdir == 'asc':
|
|
crit = (model_attr > val)
|
|
else:
|
|
crit = (model_attr < val)
|
|
|
|
# AND together all the above
|
|
crit_attrs.append(crit)
|
|
criteria.append(and_(*crit_attrs))
|
|
key_index += 1
|
|
|
|
# OR together all the ANDs
|
|
query = query.filter(or_(*criteria))
|
|
|
|
# We can't raise InstanceNotFound because we don't have a uuid to
|
|
# be looking for, so just return nothing if no match.
|
|
result = query.limit(1).first()
|
|
if result:
|
|
# We're querying for a single column, which means we get back a
|
|
# tuple of one thing. Strip that out and just return the uuid
|
|
# for our caller.
|
|
return result[0]
|
|
else:
|
|
return result
|
|
|
|
|
|
def _db_connection_type(db_connection):
|
|
"""Returns a lowercase symbol for the db type.
|
|
|
|
This is useful when we need to change what we are doing per DB
|
|
(like handling regexes). In a CellsV2 world it probably needs to
|
|
do something better than use the database configuration string.
|
|
"""
|
|
|
|
db_string = db_connection.split(':')[0].split('+')[0]
|
|
return db_string.lower()
|
|
|
|
|
|
def _safe_regex_mysql(raw_string):
|
|
"""Make regex safe to mysql.
|
|
|
|
Certain items like '|' are interpreted raw by mysql REGEX. If you
|
|
search for a single | then you trigger an error because it's
|
|
expecting content on either side.
|
|
|
|
For consistency sake we escape all '|'. This does mean we wouldn't
|
|
support something like foo|bar to match completely different
|
|
things, however, one can argue putting such complicated regex into
|
|
name search probably means you are doing this wrong.
|
|
"""
|
|
return raw_string.replace('|', '\\|')
|
|
|
|
|
|
def _get_regexp_ops(connection):
|
|
"""Return safety filter and db opts for regex."""
|
|
regexp_op_map = {
|
|
'postgresql': '~',
|
|
'mysql': 'REGEXP',
|
|
'sqlite': 'REGEXP'
|
|
}
|
|
regex_safe_filters = {
|
|
'mysql': _safe_regex_mysql
|
|
}
|
|
db_type = _db_connection_type(connection)
|
|
|
|
return (regex_safe_filters.get(db_type, lambda x: x),
|
|
regexp_op_map.get(db_type, 'LIKE'))
|
|
|
|
|
|
def _regex_instance_filter(query, filters):
|
|
|
|
"""Applies regular expression filtering to an Instance query.
|
|
|
|
Returns the updated query.
|
|
|
|
:param query: query to apply filters to
|
|
:param filters: dictionary of filters with regex values
|
|
"""
|
|
|
|
model = models.Instance
|
|
safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection)
|
|
for filter_name in filters:
|
|
try:
|
|
column_attr = getattr(model, filter_name)
|
|
except AttributeError:
|
|
continue
|
|
if 'property' == type(column_attr).__name__:
|
|
continue
|
|
filter_val = filters[filter_name]
|
|
# Sometimes the REGEX filter value is not a string
|
|
if not isinstance(filter_val, six.string_types):
|
|
filter_val = str(filter_val)
|
|
if db_regexp_op == 'LIKE':
|
|
query = query.filter(column_attr.op(db_regexp_op)(
|
|
u'%' + filter_val + u'%'))
|
|
else:
|
|
filter_val = safe_regex_filter(filter_val)
|
|
query = query.filter(column_attr.op(db_regexp_op)(
|
|
filter_val))
|
|
return query
|
|
|
|
|
|
def _exact_instance_filter(query, filters, legal_keys):
|
|
"""Applies exact match filtering to an Instance query.
|
|
|
|
Returns the updated query. Modifies filters argument to remove
|
|
filters consumed.
|
|
|
|
:param query: query to apply filters to
|
|
:param filters: dictionary of filters; values that are lists,
|
|
tuples, sets, or frozensets cause an 'IN' test to
|
|
be performed, while exact matching ('==' operator)
|
|
is used for other values
|
|
:param legal_keys: list of keys to apply exact filtering to
|
|
"""
|
|
|
|
filter_dict = {}
|
|
model = models.Instance
|
|
|
|
# Walk through all the keys
|
|
for key in legal_keys:
|
|
# Skip ones we're not filtering on
|
|
if key not in filters:
|
|
continue
|
|
|
|
# OK, filtering on this key; what value do we search for?
|
|
value = filters.pop(key)
|
|
|
|
if key in ('metadata', 'system_metadata'):
|
|
column_attr = getattr(model, key)
|
|
if isinstance(value, list):
|
|
for item in value:
|
|
for k, v in item.items():
|
|
query = query.filter(column_attr.any(key=k))
|
|
query = query.filter(column_attr.any(value=v))
|
|
|
|
else:
|
|
for k, v in value.items():
|
|
query = query.filter(column_attr.any(key=k))
|
|
query = query.filter(column_attr.any(value=v))
|
|
elif isinstance(value, (list, tuple, set, frozenset)):
|
|
if not value:
|
|
return None # empty IN-predicate; short circuit
|
|
# Looking for values in a list; apply to query directly
|
|
column_attr = getattr(model, key)
|
|
query = query.filter(column_attr.in_(value))
|
|
else:
|
|
# OK, simple exact match; save for later
|
|
filter_dict[key] = value
|
|
|
|
# Apply simple exact matches
|
|
if filter_dict:
|
|
query = query.filter(*[getattr(models.Instance, k) == v
|
|
for k, v in filter_dict.items()])
|
|
return query
|
|
|
|
|
|
def process_sort_params(sort_keys, sort_dirs,
|
|
default_keys=['created_at', 'id'],
|
|
default_dir='asc'):
|
|
"""Process the sort parameters to include default keys.
|
|
|
|
Creates a list of sort keys and a list of sort directions. Adds the default
|
|
keys to the end of the list if they are not already included.
|
|
|
|
When adding the default keys to the sort keys list, the associated
|
|
direction is:
|
|
1) The first element in the 'sort_dirs' list (if specified), else
|
|
2) 'default_dir' value (Note that 'asc' is the default value since this is
|
|
the default in sqlalchemy.utils.paginate_query)
|
|
|
|
:param sort_keys: List of sort keys to include in the processed list
|
|
:param sort_dirs: List of sort directions to include in the processed list
|
|
:param default_keys: List of sort keys that need to be included in the
|
|
processed list, they are added at the end of the list
|
|
if not already specified.
|
|
:param default_dir: Sort direction associated with each of the default
|
|
keys that are not supplied, used when they are added
|
|
to the processed list
|
|
:returns: list of sort keys, list of sort directions
|
|
:raise exception.InvalidInput: If more sort directions than sort keys
|
|
are specified or if an invalid sort
|
|
direction is specified
|
|
"""
|
|
# Determine direction to use for when adding default keys
|
|
if sort_dirs and len(sort_dirs) != 0:
|
|
default_dir_value = sort_dirs[0]
|
|
else:
|
|
default_dir_value = default_dir
|
|
|
|
# Create list of keys (do not modify the input list)
|
|
if sort_keys:
|
|
result_keys = list(sort_keys)
|
|
else:
|
|
result_keys = []
|
|
|
|
# If a list of directions is not provided, use the default sort direction
|
|
# for all provided keys
|
|
if sort_dirs:
|
|
result_dirs = []
|
|
# Verify sort direction
|
|
for sort_dir in sort_dirs:
|
|
if sort_dir not in ('asc', 'desc'):
|
|
msg = _("Unknown sort direction, must be 'desc' or 'asc'")
|
|
raise exception.InvalidInput(reason=msg)
|
|
result_dirs.append(sort_dir)
|
|
else:
|
|
result_dirs = [default_dir_value for _sort_key in result_keys]
|
|
|
|
# Ensure that the key and direction length match
|
|
while len(result_dirs) < len(result_keys):
|
|
result_dirs.append(default_dir_value)
|
|
# Unless more direction are specified, which is an error
|
|
if len(result_dirs) > len(result_keys):
|
|
msg = _("Sort direction size exceeds sort key size")
|
|
raise exception.InvalidInput(reason=msg)
|
|
|
|
# Ensure defaults are included
|
|
for key in default_keys:
|
|
if key not in result_keys:
|
|
result_keys.append(key)
|
|
result_dirs.append(default_dir_value)
|
|
|
|
return result_keys, result_dirs
|
|
|
|
|
|
@require_context
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_active_by_window_joined(context, begin, end=None,
|
|
project_id=None, host=None,
|
|
columns_to_join=None, limit=None,
|
|
marker=None):
|
|
"""Return instances and joins that were active during window."""
|
|
query = context.session.query(models.Instance)
|
|
|
|
if columns_to_join is None:
|
|
columns_to_join_new = ['info_cache', 'security_groups']
|
|
manual_joins = ['metadata', 'system_metadata']
|
|
else:
|
|
manual_joins, columns_to_join_new = (
|
|
_manual_join_columns(columns_to_join))
|
|
|
|
for column in columns_to_join_new:
|
|
if 'extra.' in column:
|
|
query = query.options(undefer(column))
|
|
else:
|
|
query = query.options(joinedload(column))
|
|
|
|
query = query.filter(or_(models.Instance.terminated_at == null(),
|
|
models.Instance.terminated_at > begin))
|
|
if end:
|
|
query = query.filter(models.Instance.launched_at < end)
|
|
if project_id:
|
|
query = query.filter_by(project_id=project_id)
|
|
if host:
|
|
query = query.filter_by(host=host)
|
|
|
|
if marker is not None:
|
|
try:
|
|
marker = _instance_get_by_uuid(
|
|
context.elevated(read_deleted='yes'), marker)
|
|
except exception.InstanceNotFound:
|
|
raise exception.MarkerNotFound(marker=marker)
|
|
|
|
query = sqlalchemyutils.paginate_query(
|
|
query, models.Instance, limit, ['project_id', 'uuid'], marker=marker)
|
|
|
|
return _instances_fill_metadata(context, query.all(), manual_joins)
|
|
|
|
|
|
def _instance_get_all_query(context, project_only=False, joins=None):
|
|
if joins is None:
|
|
joins = ['info_cache', 'security_groups']
|
|
|
|
query = model_query(context,
|
|
models.Instance,
|
|
project_only=project_only)
|
|
for column in joins:
|
|
if 'extra.' in column:
|
|
query = query.options(undefer(column))
|
|
else:
|
|
query = query.options(joinedload(column))
|
|
return query
|
|
|
|
|
|
@pick_context_manager_reader_allow_async
|
|
def instance_get_all_by_host(context, host, columns_to_join=None):
|
|
query = _instance_get_all_query(context, joins=columns_to_join)
|
|
return _instances_fill_metadata(context,
|
|
query.filter_by(host=host).all(),
|
|
manual_joins=columns_to_join)
|
|
|
|
|
|
def _instance_get_all_uuids_by_host(context, host):
|
|
"""Return a list of the instance uuids on a given host.
|
|
|
|
Returns a list of UUIDs, not Instance model objects.
|
|
"""
|
|
uuids = []
|
|