From a009d30a580b26d9e103ca29fddaa481f544f7db Mon Sep 17 00:00:00 2001 From: Graham Hayes Date: Fri, 10 Apr 2015 14:19:24 +0100 Subject: [PATCH] Moved RecordSet lookup to a custom join query * New function, to get all RRsets + Records in one go. Change-Id: I562acb1edbae0e92cbb54d290b1a4f37bd672b2a Closes-Bug: #1413472 --- designate/sqlalchemy/base.py | 210 ++++++++++++++++-- designate/sqlalchemy/utils.py | 94 +++++--- designate/storage/impl_sqlalchemy/__init__.py | 42 ++-- 3 files changed, 276 insertions(+), 70 deletions(-) diff --git a/designate/sqlalchemy/base.py b/designate/sqlalchemy/base.py index cb1277115..d4b642b23 100644 --- a/designate/sqlalchemy/base.py +++ b/designate/sqlalchemy/base.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. import abc +import operator import threading import six @@ -21,7 +22,6 @@ from oslo_db.sqlalchemy import utils as oslodb_utils from oslo_db import exception as oslo_db_exception from oslo_log import log as logging from oslo_utils import timeutils -from sqlalchemy import exc as sqlalchemy_exc from sqlalchemy import select, or_ from designate import exceptions @@ -194,6 +194,7 @@ class SQLAlchemy(object): def _find(self, context, table, cls, list_cls, exc_notfound, criterion, one=False, marker=None, limit=None, sort_key=None, sort_dir=None, query=None, apply_tenant_criteria=True): + sort_key = sort_key or 'created_at' sort_dir = sort_dir or 'asc' @@ -220,24 +221,7 @@ class SQLAlchemy(object): return _set_object_from_model(cls(), results[0]) else: if marker is not None: - # If marker is not none and basestring we query it. - # Otherwise, return all matching records - marker_query = select([table]).where(table.c.id == marker) - - try: - marker_resultproxy = self.session.execute(marker_query) - marker = marker_resultproxy.fetchone() - if marker is None: - raise exceptions.MarkerNotFound( - 'Marker %s could not be found' % marker) - except oslo_db_exception.DBError as e: - # Malformed UUIDs return StatementError wrapped in a - # DBError - if isinstance(e.inner_exception, - sqlalchemy_exc.StatementError): - raise exceptions.InvalidMarker() - else: - raise + marker = utils.check_marker(table, marker, self.session) try: query = utils.paginate_query( @@ -258,6 +242,194 @@ class SQLAlchemy(object): except ValueError as value_error: raise exceptions.ValueError(value_error.message) + def _find_recordsets_with_records( + self, context, table, cls, + list_cls, exc_notfound, criterion, + one=False, marker=None, limit=None, sort_key=None, + sort_dir=None, query=None, apply_tenant_criteria=True, + load_relations=False, relation_table=None, relation_cls=None, + relation_list_cls=None, relation_not_found_exc=None): + + sort_key = sort_key or 'created_at' + sort_dir = sort_dir or 'asc' + + # Join the 2 required tables + rjoin = table.outerjoin( + relation_table, + relation_table.c.recordset_id == table.c.id) + + inner_q = select([table.c.id]) + + if marker is not None: + marker = utils.check_marker(table, marker, self.session) + + try: + inner_q = utils.paginate_query( + inner_q, table, limit, + [sort_key, 'id'], marker=marker, + sort_dir=sort_dir) + + except oslodb_utils.InvalidSortKey as sort_key_error: + raise exceptions.InvalidSortKey(sort_key_error.message) + # Any ValueErrors are propagated back to the user as is. + # Limits, sort_dir and sort_key are checked at the API layer. + # If however central or storage is called directly, invalid values + # show up as ValueError + except ValueError as value_error: + raise exceptions.ValueError(value_error.message) + + inner_q = self._apply_criterion(table, inner_q, criterion) + inner_q = self._apply_deleted_criteria(context, table, inner_q) + + # Get the list of IDs needed. + # This is a separate call due to + # http://dev.mysql.com/doc/mysql-reslimits-excerpt/5.6/en/subquery-restrictions.html # noqa + + inner_rproxy = self.session.execute(inner_q) + ids = inner_rproxy.fetchall() + + # formatted_ids = [id[0] for id in ids] + formatted_ids = map(operator.itemgetter(0), ids) + + query = select( + [ + # RS Info + table.c.id, # 0 - RS ID + table.c.version, # 1 - RS Version + table.c.created_at, # 2 - RS Created + table.c.updated_at, # 3 - RS Updated + table.c.tenant_id, # 4 - RS Tenant + table.c.domain_id, # 5 - RS Domain + table.c.name, # 6 - RS Name + table.c.type, # 7 - RS Type + table.c.ttl, # 8 - RS TTL + table.c.description, # 9 - RS Desc + # R Info + relation_table.c.id, # 10 - R ID + relation_table.c.version, # 11 - R Version + relation_table.c.created_at, # 12 - R Created + relation_table.c.updated_at, # 13 - R Updated + relation_table.c.tenant_id, # 14 - R Tenant + relation_table.c.domain_id, # 15 - R Domain + relation_table.c.recordset_id, # 16 - R RSet + relation_table.c.data, # 17 - R Data + relation_table.c.description, # 18 - R Desc + relation_table.c.hash, # 19 - R Hash + relation_table.c.managed, # 20 - R Mngd Flg + relation_table.c.managed_plugin_name, # 21 - R Mngd Plg + relation_table.c.managed_resource_type, # 22 - R Mngd Type + relation_table.c.managed_resource_region, # 23 - R Mngd Rgn + relation_table.c.managed_resource_id, # 24 - R Mngd ID + relation_table.c.managed_tenant_id, # 25 - R Mngd T ID + relation_table.c.status, # 26 - R Status + relation_table.c.action, # 27 - R Action + relation_table.c.serial # 28 - R Serial + ]).\ + select_from( + rjoin + ).\ + where( + table.c.id.in_(formatted_ids) + ) + + # These make looking up indexes for the Raw Rows much easier, + # and maintainable + + rs_map = { + "id": 0, + "version": 1, + "created_at": 2, + "updated_at": 3, + "tenant_id": 4, + "domain_id": 5, + "name": 6, + "type": 7, + "ttl": 8, + "description": 9, + } + + r_map = { + "id": 10, + "version": 11, + "created_at": 12, + "updated_at": 13, + "tenant_id": 14, + "domain_id": 15, + "recordset_id": 16, + "data": 17, + "description": 18, + "hash": 19, + "managed": 20, + "managed_plugin_name": 21, + "managed_resource_type": 22, + "managed_resource_region": 23, + "managed_resource_id": 24, + "managed_tenant_id": 25, + "status": 26, + "action": 27, + "serial": 28, + } + + query, sort_dirs = utils.sort_query(query, table, [sort_key, 'id'], + sort_dir=sort_dir) + + try: + resultproxy = self.session.execute(query) + raw_rows = resultproxy.fetchall() + + # Any ValueErrors are propagated back to the user as is. + # If however central or storage is called directly, invalid values + # show up as ValueError + except ValueError as value_error: + raise exceptions.ValueError(value_error.message) + + rrsets = list_cls() + rrset_id = None + current_rrset = None + + for record in raw_rows: + # If we're looking at the first, or a new rrset + if record[0] != rrset_id: + if current_rrset is not None: + # If this isn't the first iteration + rrsets.append(current_rrset) + # Set up a new rrset + current_rrset = cls() + + rrset_id = record[rs_map['id']] + + # Add all the loaded vars into RecordSet object + + for key, value in rs_map.iteritems(): + setattr(current_rrset, key, record[value]) + + current_rrset.records = relation_list_cls() + + if record[r_map['id']] is not None: + rrdata = relation_cls() + + for key, value in r_map.iteritems(): + setattr(rrdata, key, record[value]) + + current_rrset.records.append(rrdata) + + else: + # We've already got an rrset, add the rdata + if record[r_map['id']] is not None: + + for key, value in r_map.iteritems(): + setattr(rrdata, key, record[value]) + + current_rrset.records.append(rrdata) + + # If the last record examined was a new rrset, or there is only 1 rrset + if len(rrsets) == 0 or \ + (len(rrsets) != 0 and rrsets[-1] != current_rrset): + if current_rrset is not None: + rrsets.append(current_rrset) + + return rrsets + def _update(self, context, table, obj, exc_dup, exc_notfound, skip_values=None): diff --git a/designate/sqlalchemy/utils.py b/designate/sqlalchemy/utils.py index 6b899fabc..839cb8e0e 100644 --- a/designate/sqlalchemy/utils.py +++ b/designate/sqlalchemy/utils.py @@ -18,11 +18,15 @@ import logging import sqlalchemy +from sqlalchemy import exc as sqlalchemy_exc +from sqlalchemy import select from oslo_db.sqlalchemy import utils +from oslo_db import exception as oslo_db_exception from oslo_db.sqlalchemy.migration_cli import manager from designate.i18n import _ from designate.i18n import _LW +from designate import exceptions LOG = logging.getLogger(__name__) @@ -40,38 +44,9 @@ def get_migration_manager(repo_path, url, init_version=None): # copy from olso/db/sqlalchemy/utils.py def paginate_query(query, table, limit, sort_keys, marker=None, sort_dir=None, sort_dirs=None): - if 'id' not in sort_keys: - # TODO(justinsb): If this ever gives a false-positive, check - # the actual primary key, rather than assuming its id - LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) - - assert(not (sort_dir and sort_dirs)) - - # Default the sort direction to ascending - if sort_dirs is None and sort_dir is None: - sort_dir = 'asc' - - # Ensure a per-column sort direction - if sort_dirs is None: - sort_dirs = [sort_dir for _sort_key in sort_keys] - - assert(len(sort_dirs) == len(sort_keys)) # Add sorting - for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - try: - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - except KeyError: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) - try: - sort_key_attr = getattr(table.c, current_sort_key) - except AttributeError: - raise utils.InvalidSortKey() - query = query.order_by(sort_dir_func(sort_key_attr)) + query, sort_dirs = sort_query(query, table, sort_keys, sort_dir=sort_dir) # Add pagination if marker is not None: @@ -104,3 +79,62 @@ def paginate_query(query, table, limit, sort_keys, marker=None, query = query.limit(limit) return query + + +def sort_query(query, table, sort_keys, sort_dir=None, sort_dirs=None): + + if 'id' not in sort_keys: + # TODO(justinsb): If this ever gives a false-positive, check + # the actual primary key, rather than assuming its id + LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) + + assert(not (sort_dir and sort_dirs)) + + # Default the sort direction to ascending + if sort_dirs is None and sort_dir is None: + sort_dir = 'asc' + + # Ensure a per-column sort direction + if sort_dirs is None: + sort_dirs = [sort_dir for _sort_key in sort_keys] + + assert(len(sort_dirs) == len(sort_keys)) + + for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): + try: + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + except KeyError: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) + try: + sort_key_attr = getattr(table.c, current_sort_key) + except AttributeError: + raise utils.InvalidSortKey() + query = query.order_by(sort_dir_func(sort_key_attr)) + + return query, sort_dirs + + +def check_marker(table, marker, session): + + marker_query = select([table]).where(table.c.id == marker) + + try: + marker_resultproxy = session.execute(marker_query) + marker = marker_resultproxy.fetchone() + if marker is None: + raise exceptions.MarkerNotFound( + 'Marker %s could not be found' % marker) + except oslo_db_exception.DBError as e: + # Malformed UUIDs return StatementError wrapped in a + # DBError + if isinstance(e.inner_exception, + sqlalchemy_exc.StatementError): + raise exceptions.InvalidMarker() + else: + raise + + return marker diff --git a/designate/storage/impl_sqlalchemy/__init__.py b/designate/storage/impl_sqlalchemy/__init__.py index 3b360cec3..62b54b705 100644 --- a/designate/storage/impl_sqlalchemy/__init__.py +++ b/designate/storage/impl_sqlalchemy/__init__.py @@ -434,43 +434,43 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage): # RecordSet Methods def _find_recordsets(self, context, criterion, one=False, marker=None, limit=None, sort_key=None, sort_dir=None): - query = None # Check to see if the criterion can use the reverse_name column criterion = self._rname_check(criterion) if criterion is not None \ and not criterion.get('domains_deleted', True): - # Ensure that we return only active recordsets + # remove 'domains_deleted' from the criterion, as _apply_criterion + # assumes each key in criterion to be a column name. + del criterion['domains_deleted'] + + if one: rjoin = tables.recordsets.join( tables.domains, tables.recordsets.c.domain_id == tables.domains.c.id) query = select([tables.recordsets]).select_from(rjoin).\ where(tables.domains.c.deleted == '0') - # remove 'domains_deleted' from the criterion, as _apply_criterion - # assumes each key in criterion to be a column name. - del criterion['domains_deleted'] - recordsets = self._find( - context, tables.recordsets, objects.RecordSet, - objects.RecordSetList, exceptions.RecordSetNotFound, criterion, - one, marker, limit, sort_key, sort_dir, query) + recordsets = self._find( + context, tables.recordsets, objects.RecordSet, + objects.RecordSetList, exceptions.RecordSetNotFound, criterion, + one, marker, limit, sort_key, sort_dir, query) - if not one: - recordsets.total_count = self.count_recordsets(context, criterion) + recordsets.records = self._find_records( + context, {'recordset_id': recordsets.id}) - # Load Relations - def _load_relations(recordset): - recordset.records = self._find_records( - context, {'recordset_id': recordset.id}) + recordsets.obj_reset_changes(['records']) - recordset.obj_reset_changes(['records']) - - if one: - _load_relations(recordsets) else: - for recordset in recordsets: - _load_relations(recordset) + recordsets = self._find_recordsets_with_records( + context, tables.recordsets, objects.RecordSet, + objects.RecordSetList, exceptions.RecordSetNotFound, criterion, + load_relations=True, relation_table=tables.records, + relation_cls=objects.Record, + relation_list_cls=objects.RecordList, limit=limit, + marker=marker, sort_key=sort_key, sort_dir=sort_dir) + + recordsets.total_count = self.count_recordsets(context, criterion) return recordsets