Merge "Moved RecordSet lookup to a custom join query"
This commit is contained in:
commit
5d4eeca3e5
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
import operator
|
||||
import threading
|
||||
|
||||
import six
|
||||
@ -21,7 +22,6 @@ from oslo_db.sqlalchemy import utils as oslodb_utils
|
||||
from oslo_db import exception as oslo_db_exception
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy import exc as sqlalchemy_exc
|
||||
from sqlalchemy import select, or_
|
||||
|
||||
from designate import exceptions
|
||||
@ -194,6 +194,7 @@ class SQLAlchemy(object):
|
||||
def _find(self, context, table, cls, list_cls, exc_notfound, criterion,
|
||||
one=False, marker=None, limit=None, sort_key=None,
|
||||
sort_dir=None, query=None, apply_tenant_criteria=True):
|
||||
|
||||
sort_key = sort_key or 'created_at'
|
||||
sort_dir = sort_dir or 'asc'
|
||||
|
||||
@ -220,24 +221,7 @@ class SQLAlchemy(object):
|
||||
return _set_object_from_model(cls(), results[0])
|
||||
else:
|
||||
if marker is not None:
|
||||
# If marker is not none and basestring we query it.
|
||||
# Otherwise, return all matching records
|
||||
marker_query = select([table]).where(table.c.id == marker)
|
||||
|
||||
try:
|
||||
marker_resultproxy = self.session.execute(marker_query)
|
||||
marker = marker_resultproxy.fetchone()
|
||||
if marker is None:
|
||||
raise exceptions.MarkerNotFound(
|
||||
'Marker %s could not be found' % marker)
|
||||
except oslo_db_exception.DBError as e:
|
||||
# Malformed UUIDs return StatementError wrapped in a
|
||||
# DBError
|
||||
if isinstance(e.inner_exception,
|
||||
sqlalchemy_exc.StatementError):
|
||||
raise exceptions.InvalidMarker()
|
||||
else:
|
||||
raise
|
||||
marker = utils.check_marker(table, marker, self.session)
|
||||
|
||||
try:
|
||||
query = utils.paginate_query(
|
||||
@ -258,6 +242,194 @@ class SQLAlchemy(object):
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(value_error.message)
|
||||
|
||||
def _find_recordsets_with_records(
|
||||
self, context, table, cls,
|
||||
list_cls, exc_notfound, criterion,
|
||||
one=False, marker=None, limit=None, sort_key=None,
|
||||
sort_dir=None, query=None, apply_tenant_criteria=True,
|
||||
load_relations=False, relation_table=None, relation_cls=None,
|
||||
relation_list_cls=None, relation_not_found_exc=None):
|
||||
|
||||
sort_key = sort_key or 'created_at'
|
||||
sort_dir = sort_dir or 'asc'
|
||||
|
||||
# Join the 2 required tables
|
||||
rjoin = table.outerjoin(
|
||||
relation_table,
|
||||
relation_table.c.recordset_id == table.c.id)
|
||||
|
||||
inner_q = select([table.c.id])
|
||||
|
||||
if marker is not None:
|
||||
marker = utils.check_marker(table, marker, self.session)
|
||||
|
||||
try:
|
||||
inner_q = utils.paginate_query(
|
||||
inner_q, table, limit,
|
||||
[sort_key, 'id'], marker=marker,
|
||||
sort_dir=sort_dir)
|
||||
|
||||
except oslodb_utils.InvalidSortKey as sort_key_error:
|
||||
raise exceptions.InvalidSortKey(sort_key_error.message)
|
||||
# Any ValueErrors are propagated back to the user as is.
|
||||
# Limits, sort_dir and sort_key are checked at the API layer.
|
||||
# If however central or storage is called directly, invalid values
|
||||
# show up as ValueError
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(value_error.message)
|
||||
|
||||
inner_q = self._apply_criterion(table, inner_q, criterion)
|
||||
inner_q = self._apply_deleted_criteria(context, table, inner_q)
|
||||
|
||||
# Get the list of IDs needed.
|
||||
# This is a separate call due to
|
||||
# http://dev.mysql.com/doc/mysql-reslimits-excerpt/5.6/en/subquery-restrictions.html # noqa
|
||||
|
||||
inner_rproxy = self.session.execute(inner_q)
|
||||
ids = inner_rproxy.fetchall()
|
||||
|
||||
# formatted_ids = [id[0] for id in ids]
|
||||
formatted_ids = map(operator.itemgetter(0), ids)
|
||||
|
||||
query = select(
|
||||
[
|
||||
# RS Info
|
||||
table.c.id, # 0 - RS ID
|
||||
table.c.version, # 1 - RS Version
|
||||
table.c.created_at, # 2 - RS Created
|
||||
table.c.updated_at, # 3 - RS Updated
|
||||
table.c.tenant_id, # 4 - RS Tenant
|
||||
table.c.domain_id, # 5 - RS Domain
|
||||
table.c.name, # 6 - RS Name
|
||||
table.c.type, # 7 - RS Type
|
||||
table.c.ttl, # 8 - RS TTL
|
||||
table.c.description, # 9 - RS Desc
|
||||
# R Info
|
||||
relation_table.c.id, # 10 - R ID
|
||||
relation_table.c.version, # 11 - R Version
|
||||
relation_table.c.created_at, # 12 - R Created
|
||||
relation_table.c.updated_at, # 13 - R Updated
|
||||
relation_table.c.tenant_id, # 14 - R Tenant
|
||||
relation_table.c.domain_id, # 15 - R Domain
|
||||
relation_table.c.recordset_id, # 16 - R RSet
|
||||
relation_table.c.data, # 17 - R Data
|
||||
relation_table.c.description, # 18 - R Desc
|
||||
relation_table.c.hash, # 19 - R Hash
|
||||
relation_table.c.managed, # 20 - R Mngd Flg
|
||||
relation_table.c.managed_plugin_name, # 21 - R Mngd Plg
|
||||
relation_table.c.managed_resource_type, # 22 - R Mngd Type
|
||||
relation_table.c.managed_resource_region, # 23 - R Mngd Rgn
|
||||
relation_table.c.managed_resource_id, # 24 - R Mngd ID
|
||||
relation_table.c.managed_tenant_id, # 25 - R Mngd T ID
|
||||
relation_table.c.status, # 26 - R Status
|
||||
relation_table.c.action, # 27 - R Action
|
||||
relation_table.c.serial # 28 - R Serial
|
||||
]).\
|
||||
select_from(
|
||||
rjoin
|
||||
).\
|
||||
where(
|
||||
table.c.id.in_(formatted_ids)
|
||||
)
|
||||
|
||||
# These make looking up indexes for the Raw Rows much easier,
|
||||
# and maintainable
|
||||
|
||||
rs_map = {
|
||||
"id": 0,
|
||||
"version": 1,
|
||||
"created_at": 2,
|
||||
"updated_at": 3,
|
||||
"tenant_id": 4,
|
||||
"domain_id": 5,
|
||||
"name": 6,
|
||||
"type": 7,
|
||||
"ttl": 8,
|
||||
"description": 9,
|
||||
}
|
||||
|
||||
r_map = {
|
||||
"id": 10,
|
||||
"version": 11,
|
||||
"created_at": 12,
|
||||
"updated_at": 13,
|
||||
"tenant_id": 14,
|
||||
"domain_id": 15,
|
||||
"recordset_id": 16,
|
||||
"data": 17,
|
||||
"description": 18,
|
||||
"hash": 19,
|
||||
"managed": 20,
|
||||
"managed_plugin_name": 21,
|
||||
"managed_resource_type": 22,
|
||||
"managed_resource_region": 23,
|
||||
"managed_resource_id": 24,
|
||||
"managed_tenant_id": 25,
|
||||
"status": 26,
|
||||
"action": 27,
|
||||
"serial": 28,
|
||||
}
|
||||
|
||||
query, sort_dirs = utils.sort_query(query, table, [sort_key, 'id'],
|
||||
sort_dir=sort_dir)
|
||||
|
||||
try:
|
||||
resultproxy = self.session.execute(query)
|
||||
raw_rows = resultproxy.fetchall()
|
||||
|
||||
# Any ValueErrors are propagated back to the user as is.
|
||||
# If however central or storage is called directly, invalid values
|
||||
# show up as ValueError
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(value_error.message)
|
||||
|
||||
rrsets = list_cls()
|
||||
rrset_id = None
|
||||
current_rrset = None
|
||||
|
||||
for record in raw_rows:
|
||||
# If we're looking at the first, or a new rrset
|
||||
if record[0] != rrset_id:
|
||||
if current_rrset is not None:
|
||||
# If this isn't the first iteration
|
||||
rrsets.append(current_rrset)
|
||||
# Set up a new rrset
|
||||
current_rrset = cls()
|
||||
|
||||
rrset_id = record[rs_map['id']]
|
||||
|
||||
# Add all the loaded vars into RecordSet object
|
||||
|
||||
for key, value in rs_map.iteritems():
|
||||
setattr(current_rrset, key, record[value])
|
||||
|
||||
current_rrset.records = relation_list_cls()
|
||||
|
||||
if record[r_map['id']] is not None:
|
||||
rrdata = relation_cls()
|
||||
|
||||
for key, value in r_map.iteritems():
|
||||
setattr(rrdata, key, record[value])
|
||||
|
||||
current_rrset.records.append(rrdata)
|
||||
|
||||
else:
|
||||
# We've already got an rrset, add the rdata
|
||||
if record[r_map['id']] is not None:
|
||||
|
||||
for key, value in r_map.iteritems():
|
||||
setattr(rrdata, key, record[value])
|
||||
|
||||
current_rrset.records.append(rrdata)
|
||||
|
||||
# If the last record examined was a new rrset, or there is only 1 rrset
|
||||
if len(rrsets) == 0 or \
|
||||
(len(rrsets) != 0 and rrsets[-1] != current_rrset):
|
||||
if current_rrset is not None:
|
||||
rrsets.append(current_rrset)
|
||||
|
||||
return rrsets
|
||||
|
||||
def _update(self, context, table, obj, exc_dup, exc_notfound,
|
||||
skip_values=None):
|
||||
|
||||
|
@ -18,11 +18,15 @@
|
||||
import logging
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy import exc as sqlalchemy_exc
|
||||
from sqlalchemy import select
|
||||
from oslo_db.sqlalchemy import utils
|
||||
from oslo_db import exception as oslo_db_exception
|
||||
from oslo_db.sqlalchemy.migration_cli import manager
|
||||
|
||||
from designate.i18n import _
|
||||
from designate.i18n import _LW
|
||||
from designate import exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -40,38 +44,9 @@ def get_migration_manager(repo_path, url, init_version=None):
|
||||
# copy from olso/db/sqlalchemy/utils.py
|
||||
def paginate_query(query, table, limit, sort_keys, marker=None,
|
||||
sort_dir=None, sort_dirs=None):
|
||||
if 'id' not in sort_keys:
|
||||
# TODO(justinsb): If this ever gives a false-positive, check
|
||||
# the actual primary key, rather than assuming its id
|
||||
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
|
||||
|
||||
assert(not (sort_dir and sort_dirs))
|
||||
|
||||
# Default the sort direction to ascending
|
||||
if sort_dirs is None and sort_dir is None:
|
||||
sort_dir = 'asc'
|
||||
|
||||
# Ensure a per-column sort direction
|
||||
if sort_dirs is None:
|
||||
sort_dirs = [sort_dir for _sort_key in sort_keys]
|
||||
|
||||
assert(len(sort_dirs) == len(sort_keys))
|
||||
|
||||
# Add sorting
|
||||
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
|
||||
try:
|
||||
sort_dir_func = {
|
||||
'asc': sqlalchemy.asc,
|
||||
'desc': sqlalchemy.desc,
|
||||
}[current_sort_dir]
|
||||
except KeyError:
|
||||
raise ValueError(_("Unknown sort direction, "
|
||||
"must be 'desc' or 'asc'"))
|
||||
try:
|
||||
sort_key_attr = getattr(table.c, current_sort_key)
|
||||
except AttributeError:
|
||||
raise utils.InvalidSortKey()
|
||||
query = query.order_by(sort_dir_func(sort_key_attr))
|
||||
query, sort_dirs = sort_query(query, table, sort_keys, sort_dir=sort_dir)
|
||||
|
||||
# Add pagination
|
||||
if marker is not None:
|
||||
@ -104,3 +79,62 @@ def paginate_query(query, table, limit, sort_keys, marker=None,
|
||||
query = query.limit(limit)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def sort_query(query, table, sort_keys, sort_dir=None, sort_dirs=None):
|
||||
|
||||
if 'id' not in sort_keys:
|
||||
# TODO(justinsb): If this ever gives a false-positive, check
|
||||
# the actual primary key, rather than assuming its id
|
||||
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
|
||||
|
||||
assert(not (sort_dir and sort_dirs))
|
||||
|
||||
# Default the sort direction to ascending
|
||||
if sort_dirs is None and sort_dir is None:
|
||||
sort_dir = 'asc'
|
||||
|
||||
# Ensure a per-column sort direction
|
||||
if sort_dirs is None:
|
||||
sort_dirs = [sort_dir for _sort_key in sort_keys]
|
||||
|
||||
assert(len(sort_dirs) == len(sort_keys))
|
||||
|
||||
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
|
||||
try:
|
||||
sort_dir_func = {
|
||||
'asc': sqlalchemy.asc,
|
||||
'desc': sqlalchemy.desc,
|
||||
}[current_sort_dir]
|
||||
except KeyError:
|
||||
raise ValueError(_("Unknown sort direction, "
|
||||
"must be 'desc' or 'asc'"))
|
||||
try:
|
||||
sort_key_attr = getattr(table.c, current_sort_key)
|
||||
except AttributeError:
|
||||
raise utils.InvalidSortKey()
|
||||
query = query.order_by(sort_dir_func(sort_key_attr))
|
||||
|
||||
return query, sort_dirs
|
||||
|
||||
|
||||
def check_marker(table, marker, session):
|
||||
|
||||
marker_query = select([table]).where(table.c.id == marker)
|
||||
|
||||
try:
|
||||
marker_resultproxy = session.execute(marker_query)
|
||||
marker = marker_resultproxy.fetchone()
|
||||
if marker is None:
|
||||
raise exceptions.MarkerNotFound(
|
||||
'Marker %s could not be found' % marker)
|
||||
except oslo_db_exception.DBError as e:
|
||||
# Malformed UUIDs return StatementError wrapped in a
|
||||
# DBError
|
||||
if isinstance(e.inner_exception,
|
||||
sqlalchemy_exc.StatementError):
|
||||
raise exceptions.InvalidMarker()
|
||||
else:
|
||||
raise
|
||||
|
||||
return marker
|
||||
|
@ -434,43 +434,43 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
|
||||
# RecordSet Methods
|
||||
def _find_recordsets(self, context, criterion, one=False, marker=None,
|
||||
limit=None, sort_key=None, sort_dir=None):
|
||||
query = None
|
||||
|
||||
# Check to see if the criterion can use the reverse_name column
|
||||
criterion = self._rname_check(criterion)
|
||||
|
||||
if criterion is not None \
|
||||
and not criterion.get('domains_deleted', True):
|
||||
# Ensure that we return only active recordsets
|
||||
# remove 'domains_deleted' from the criterion, as _apply_criterion
|
||||
# assumes each key in criterion to be a column name.
|
||||
del criterion['domains_deleted']
|
||||
|
||||
if one:
|
||||
rjoin = tables.recordsets.join(
|
||||
tables.domains,
|
||||
tables.recordsets.c.domain_id == tables.domains.c.id)
|
||||
query = select([tables.recordsets]).select_from(rjoin).\
|
||||
where(tables.domains.c.deleted == '0')
|
||||
|
||||
# remove 'domains_deleted' from the criterion, as _apply_criterion
|
||||
# assumes each key in criterion to be a column name.
|
||||
del criterion['domains_deleted']
|
||||
recordsets = self._find(
|
||||
context, tables.recordsets, objects.RecordSet,
|
||||
objects.RecordSetList, exceptions.RecordSetNotFound, criterion,
|
||||
one, marker, limit, sort_key, sort_dir, query)
|
||||
recordsets = self._find(
|
||||
context, tables.recordsets, objects.RecordSet,
|
||||
objects.RecordSetList, exceptions.RecordSetNotFound, criterion,
|
||||
one, marker, limit, sort_key, sort_dir, query)
|
||||
|
||||
if not one:
|
||||
recordsets.total_count = self.count_recordsets(context, criterion)
|
||||
recordsets.records = self._find_records(
|
||||
context, {'recordset_id': recordsets.id})
|
||||
|
||||
# Load Relations
|
||||
def _load_relations(recordset):
|
||||
recordset.records = self._find_records(
|
||||
context, {'recordset_id': recordset.id})
|
||||
recordsets.obj_reset_changes(['records'])
|
||||
|
||||
recordset.obj_reset_changes(['records'])
|
||||
|
||||
if one:
|
||||
_load_relations(recordsets)
|
||||
else:
|
||||
for recordset in recordsets:
|
||||
_load_relations(recordset)
|
||||
recordsets = self._find_recordsets_with_records(
|
||||
context, tables.recordsets, objects.RecordSet,
|
||||
objects.RecordSetList, exceptions.RecordSetNotFound, criterion,
|
||||
load_relations=True, relation_table=tables.records,
|
||||
relation_cls=objects.Record,
|
||||
relation_list_cls=objects.RecordList, limit=limit,
|
||||
marker=marker, sort_key=sort_key, sort_dir=sort_dir)
|
||||
|
||||
recordsets.total_count = self.count_recordsets(context, criterion)
|
||||
|
||||
return recordsets
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user