Fix data filtering with pagination

When listing recordsets with filtering on record data,
designate makes a first filtering against recordsets table
to retrieve a list of recordsets belonging to the specified
zone, followed by the second filtering against records table
on record data (e.g. IPs), then intersects the two.
Check https://github.com/openstack/designate/blob/master/designate/api/v2/controllers/recordsets.py

This approach does not work properly when pagination happens.
Imagine a zone has 21 A records with 10.* like IPs,
and 9 A records with 192.* like IPs. When requesting
"/v2/zone/{zone_id}/recordsets?data=10.*", designate makes
the first filtering and get the first page (i.e. 20 recordsets) which may
have both 192.* and 10.* records mixed together,
the second filtering actually excludes all 192.* records from the first page.
But that ends up with a page less than 20 records, so designate won't
include a 'next' link in the response.

This patch fixes the problem.

Closes-bug: #1561746
Change-Id: Ib06dd288d129ff4b39c388d80f24d179c6af28d8
This commit is contained in:
James Li 2016-03-30 05:09:01 +00:00
parent f34533c41d
commit 5879c90098
4 changed files with 117 additions and 94 deletions

View File

@ -20,7 +20,6 @@ from designate import exceptions
from designate import utils
from designate.api.v2.controllers import rest
from designate.objects import RecordSet
from designate.objects import RecordSetList
from designate.objects.adapters import DesignateAdapter
from designate.i18n import _LI
@ -68,40 +67,9 @@ class RecordSetsController(rest.RestController):
criterion['zone_id'] = zone_id
# Data must be filtered separately, through the Records table
data = criterion.pop('data', None)
status = criterion.pop('status', None)
# Retrieve recordsets
recordsets = self.central_api.find_recordsets(
context, criterion, marker, limit, sort_key, sort_dir)
# 'data' filter param: only return recordsets with matching data
if data:
records = self.central_api.find_records(
context, criterion={'data': data, 'zone_id': zone_id})
recordset_with_data_ids = set(record.recordset_id
for record in records)
new_rsets = RecordSetList()
for recordset in recordsets:
if recordset.id in recordset_with_data_ids:
new_rsets.append(recordset)
recordsets = new_rsets
recordsets.total_count = len(recordset_with_data_ids)
# 'status' filter param: only return recordsets with matching status
if status:
new_rsets = RecordSetList()
for recordset in recordsets:
if recordset.status == status:
new_rsets.append(recordset)
recordsets = new_rsets
LOG.info(_LI("Retrieved %(recordsets)s"), {'recordsets': recordsets})
return DesignateAdapter.render('API_v2', recordsets, request=request)

View File

@ -22,9 +22,10 @@ from oslo_db.sqlalchemy import utils as oslodb_utils
from oslo_db import exception as oslo_db_exception
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy import select, or_, between
from sqlalchemy import select, or_, between, func, distinct
from designate import exceptions
from designate import objects
from designate.sqlalchemy import session
from designate.sqlalchemy import utils
@ -265,30 +266,39 @@ class SQLAlchemy(object):
except ValueError as value_error:
raise exceptions.ValueError(six.text_type(value_error))
def _find_recordsets_with_records(
self, context, table, cls,
list_cls, exc_notfound, criterion,
one=False, marker=None, limit=None, sort_key=None,
sort_dir=None, query=None, apply_tenant_criteria=True,
load_relations=False, relation_table=None, relation_cls=None,
relation_list_cls=None, relation_not_found_exc=None):
def _find_recordsets_with_records(self, context, criterion, zones_table,
recordsets_table, records_table,
one=False, marker=None, limit=None,
sort_key=None, sort_dir=None, query=None,
apply_tenant_criteria=True):
sort_key = sort_key or 'created_at'
sort_dir = sort_dir or 'asc'
data = criterion.pop('data', None)
status = criterion.pop('status', None)
filtering_records = data or status
# Join the 2 required tables
rjoin = table.outerjoin(
relation_table,
relation_table.c.recordset_id == table.c.id)
rzjoin = recordsets_table.join(
zones_table,
recordsets_table.c.zone_id == zones_table.c.id)
inner_q = select([table.c.id])
if filtering_records:
rzjoin = rzjoin.join(
records_table,
recordsets_table.c.id == records_table.c.recordset_id)
inner_q = select([recordsets_table.c.id]).select_from(rzjoin).\
where(zones_table.c.deleted == '0')
count_q = select([func.count(distinct(recordsets_table.c.id))]).\
select_from(rzjoin).where(zones_table.c.deleted == '0')
if marker is not None:
marker = utils.check_marker(table, marker, self.session)
marker = utils.check_marker(recordsets_table, marker,
self.session)
try:
inner_q = utils.paginate_query(
inner_q, table, limit,
inner_q, recordsets_table, limit,
[sort_key, 'id'], marker=marker,
sort_dir=sort_dir)
@ -301,8 +311,27 @@ class SQLAlchemy(object):
except ValueError as value_error:
raise exceptions.ValueError(six.text_type(value_error))
inner_q = self._apply_criterion(table, inner_q, criterion)
inner_q = self._apply_deleted_criteria(context, table, inner_q)
if apply_tenant_criteria:
inner_q = self._apply_tenant_criteria(context, recordsets_table,
inner_q)
count_q = self._apply_tenant_criteria(context, recordsets_table,
count_q)
inner_q = self._apply_criterion(recordsets_table, inner_q, criterion)
count_q = self._apply_criterion(recordsets_table, count_q, criterion)
if filtering_records:
records_criterion = dict((k, v) for k, v in (
('data', data), ('status', status)) if v is not None)
inner_q = self._apply_criterion(records_table, inner_q,
records_criterion)
count_q = self._apply_criterion(records_table, count_q,
records_criterion)
inner_q = self._apply_deleted_criteria(context, recordsets_table,
inner_q)
count_q = self._apply_deleted_criteria(context, recordsets_table,
count_q)
# Get the list of IDs needed.
# This is a separate call due to
@ -311,48 +340,57 @@ class SQLAlchemy(object):
inner_rproxy = self.session.execute(inner_q)
ids = inner_rproxy.fetchall()
resultproxy = self.session.execute(count_q)
result = resultproxy.fetchone()
total_count = 0 if result is None else result[0]
# formatted_ids = [id[0] for id in ids]
formatted_ids = six.moves.map(operator.itemgetter(0), ids)
# Join the 2 required tables
rjoin = recordsets_table.outerjoin(
records_table,
records_table.c.recordset_id == recordsets_table.c.id)
query = select(
[
# RS Info
table.c.id, # 0 - RS ID
table.c.version, # 1 - RS Version
table.c.created_at, # 2 - RS Created
table.c.updated_at, # 3 - RS Updated
table.c.tenant_id, # 4 - RS Tenant
table.c.zone_id, # 5 - RS Zone
table.c.name, # 6 - RS Name
table.c.type, # 7 - RS Type
table.c.ttl, # 8 - RS TTL
table.c.description, # 9 - RS Desc
recordsets_table.c.id, # 0 - RS ID
recordsets_table.c.version, # 1 - RS Version
recordsets_table.c.created_at, # 2 - RS Created
recordsets_table.c.updated_at, # 3 - RS Updated
recordsets_table.c.tenant_id, # 4 - RS Tenant
recordsets_table.c.zone_id, # 5 - RS Zone
recordsets_table.c.name, # 6 - RS Name
recordsets_table.c.type, # 7 - RS Type
recordsets_table.c.ttl, # 8 - RS TTL
recordsets_table.c.description, # 9 - RS Desc
# R Info
relation_table.c.id, # 10 - R ID
relation_table.c.version, # 11 - R Version
relation_table.c.created_at, # 12 - R Created
relation_table.c.updated_at, # 13 - R Updated
relation_table.c.tenant_id, # 14 - R Tenant
relation_table.c.zone_id, # 15 - R Zone
relation_table.c.recordset_id, # 16 - R RSet
relation_table.c.data, # 17 - R Data
relation_table.c.description, # 18 - R Desc
relation_table.c.hash, # 19 - R Hash
relation_table.c.managed, # 20 - R Mngd Flg
relation_table.c.managed_plugin_name, # 21 - R Mngd Plg
relation_table.c.managed_resource_type, # 22 - R Mngd Type
relation_table.c.managed_resource_region, # 23 - R Mngd Rgn
relation_table.c.managed_resource_id, # 24 - R Mngd ID
relation_table.c.managed_tenant_id, # 25 - R Mngd T ID
relation_table.c.status, # 26 - R Status
relation_table.c.action, # 27 - R Action
relation_table.c.serial # 28 - R Serial
records_table.c.id, # 10 - R ID
records_table.c.version, # 11 - R Version
records_table.c.created_at, # 12 - R Created
records_table.c.updated_at, # 13 - R Updated
records_table.c.tenant_id, # 14 - R Tenant
records_table.c.zone_id, # 15 - R Zone
records_table.c.recordset_id, # 16 - R RSet
records_table.c.data, # 17 - R Data
records_table.c.description, # 18 - R Desc
records_table.c.hash, # 19 - R Hash
records_table.c.managed, # 20 - R Mngd Flg
records_table.c.managed_plugin_name, # 21 - R Mngd Plg
records_table.c.managed_resource_type, # 22 - R Mngd Type
records_table.c.managed_resource_region, # 23 - R Mngd Rgn
records_table.c.managed_resource_id, # 24 - R Mngd ID
records_table.c.managed_tenant_id, # 25 - R Mngd T ID
records_table.c.status, # 26 - R Status
records_table.c.action, # 27 - R Action
records_table.c.serial # 28 - R Serial
]).\
select_from(
rjoin
).\
where(
table.c.id.in_(formatted_ids)
recordsets_table.c.id.in_(formatted_ids)
)
# These make looking up indexes for the Raw Rows much easier,
@ -393,7 +431,8 @@ class SQLAlchemy(object):
"serial": 28,
}
query, sort_dirs = utils.sort_query(query, table, [sort_key, 'id'],
query, sort_dirs = utils.sort_query(query, recordsets_table,
[sort_key, 'id'],
sort_dir=sort_dir)
try:
@ -406,7 +445,7 @@ class SQLAlchemy(object):
except ValueError as value_error:
raise exceptions.ValueError(six.text_type(value_error))
rrsets = list_cls()
rrsets = objects.RecordSetList()
rrset_id = None
current_rrset = None
@ -417,7 +456,7 @@ class SQLAlchemy(object):
# If this isn't the first iteration
rrsets.append(current_rrset)
# Set up a new rrset
current_rrset = cls()
current_rrset = objects.RecordSet()
rrset_id = record[rs_map['id']]
@ -426,10 +465,10 @@ class SQLAlchemy(object):
for key, value in rs_map.items():
setattr(current_rrset, key, record[value])
current_rrset.records = relation_list_cls()
current_rrset.records = objects.RecordList()
if record[r_map['id']] is not None:
rrdata = relation_cls()
rrdata = objects.Record()
for key, value in r_map.items():
setattr(rrdata, key, record[value])
@ -439,7 +478,7 @@ class SQLAlchemy(object):
else:
# We've already got an rrset, add the rdata
if record[r_map['id']] is not None:
rrdata = relation_cls()
rrdata = objects.Record()
for key, value in r_map.items():
setattr(rrdata, key, record[value])
@ -452,7 +491,7 @@ class SQLAlchemy(object):
if current_rrset is not None:
rrsets.append(current_rrset)
return rrsets
return total_count, rrsets
def _update(self, context, table, obj, exc_dup, exc_notfound,
skip_values=None):

View File

@ -630,15 +630,12 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
recordsets.obj_reset_changes(['records'])
else:
recordsets = self._find_recordsets_with_records(
context, tables.recordsets, objects.RecordSet,
objects.RecordSetList, exceptions.RecordSetNotFound, criterion,
load_relations=True, relation_table=tables.records,
relation_cls=objects.Record,
relation_list_cls=objects.RecordList, limit=limit,
marker=marker, sort_key=sort_key, sort_dir=sort_dir)
tc, recordsets = self._find_recordsets_with_records(
context, criterion, tables.zones, tables.recordsets,
tables.records, limit=limit, marker=marker,
sort_key=sort_key, sort_dir=sort_dir)
recordsets.total_count = self.count_recordsets(context, criterion)
recordsets.total_count = tc
return recordsets

View File

@ -86,6 +86,25 @@ class RecordsetTest(DesignateV2Test):
self.assertEqual(200, resp.status)
self.assertGreater(len(model.recordsets), 0)
def test_list_recordsets_with_filtering(self):
# This test ensures the behavior in bug #1561746 won't happen
post_model = datagen.random_a_recordset(self.zone.name,
ip='192.168.1.2')
self.useFixture(RecordsetFixture(self.zone.id, post_model))
for i in range(1, 3):
post_model = datagen.random_a_recordset(self.zone.name,
ip='10.0.1.{}'.format(i))
self.useFixture(RecordsetFixture(self.zone.id, post_model))
# Add limit in filter to make response paginated
filters = {"data": "10.*", "limit": 2}
resp, model = RecordsetClient.as_user('default') \
.list_recordsets(self.zone.id, filters=filters)
self.assertEqual(200, resp.status)
self.assertEqual(2, model.metadata.total_count)
self.assertEqual(len(model.recordsets), 2)
self.assertIsNotNone(model.links.next)
def assert_dns(self, model):
results = dnsclient.query_servers(model.name, model.type)