Merge "Fix data filtering with pagination"
This commit is contained in:
commit
0fa168520d
@ -20,7 +20,6 @@ from designate import exceptions
|
||||
from designate import utils
|
||||
from designate.api.v2.controllers import rest
|
||||
from designate.objects import RecordSet
|
||||
from designate.objects import RecordSetList
|
||||
from designate.objects.adapters import DesignateAdapter
|
||||
from designate.i18n import _LI
|
||||
|
||||
@ -68,40 +67,9 @@ class RecordSetsController(rest.RestController):
|
||||
|
||||
criterion['zone_id'] = zone_id
|
||||
|
||||
# Data must be filtered separately, through the Records table
|
||||
data = criterion.pop('data', None)
|
||||
status = criterion.pop('status', None)
|
||||
|
||||
# Retrieve recordsets
|
||||
recordsets = self.central_api.find_recordsets(
|
||||
context, criterion, marker, limit, sort_key, sort_dir)
|
||||
|
||||
# 'data' filter param: only return recordsets with matching data
|
||||
if data:
|
||||
records = self.central_api.find_records(
|
||||
context, criterion={'data': data, 'zone_id': zone_id})
|
||||
recordset_with_data_ids = set(record.recordset_id
|
||||
for record in records)
|
||||
|
||||
new_rsets = RecordSetList()
|
||||
|
||||
for recordset in recordsets:
|
||||
if recordset.id in recordset_with_data_ids:
|
||||
new_rsets.append(recordset)
|
||||
|
||||
recordsets = new_rsets
|
||||
recordsets.total_count = len(recordset_with_data_ids)
|
||||
|
||||
# 'status' filter param: only return recordsets with matching status
|
||||
if status:
|
||||
new_rsets = RecordSetList()
|
||||
|
||||
for recordset in recordsets:
|
||||
if recordset.status == status:
|
||||
new_rsets.append(recordset)
|
||||
|
||||
recordsets = new_rsets
|
||||
|
||||
LOG.info(_LI("Retrieved %(recordsets)s"), {'recordsets': recordsets})
|
||||
|
||||
return DesignateAdapter.render('API_v2', recordsets, request=request)
|
||||
|
@ -22,9 +22,10 @@ from oslo_db.sqlalchemy import utils as oslodb_utils
|
||||
from oslo_db import exception as oslo_db_exception
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy import select, or_, between
|
||||
from sqlalchemy import select, or_, between, func, distinct
|
||||
|
||||
from designate import exceptions
|
||||
from designate import objects
|
||||
from designate.sqlalchemy import session
|
||||
from designate.sqlalchemy import utils
|
||||
|
||||
@ -265,30 +266,39 @@ class SQLAlchemy(object):
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(six.text_type(value_error))
|
||||
|
||||
def _find_recordsets_with_records(
|
||||
self, context, table, cls,
|
||||
list_cls, exc_notfound, criterion,
|
||||
one=False, marker=None, limit=None, sort_key=None,
|
||||
sort_dir=None, query=None, apply_tenant_criteria=True,
|
||||
load_relations=False, relation_table=None, relation_cls=None,
|
||||
relation_list_cls=None, relation_not_found_exc=None):
|
||||
def _find_recordsets_with_records(self, context, criterion, zones_table,
|
||||
recordsets_table, records_table,
|
||||
one=False, marker=None, limit=None,
|
||||
sort_key=None, sort_dir=None, query=None,
|
||||
apply_tenant_criteria=True):
|
||||
|
||||
sort_key = sort_key or 'created_at'
|
||||
sort_dir = sort_dir or 'asc'
|
||||
data = criterion.pop('data', None)
|
||||
status = criterion.pop('status', None)
|
||||
filtering_records = data or status
|
||||
|
||||
# Join the 2 required tables
|
||||
rjoin = table.outerjoin(
|
||||
relation_table,
|
||||
relation_table.c.recordset_id == table.c.id)
|
||||
rzjoin = recordsets_table.join(
|
||||
zones_table,
|
||||
recordsets_table.c.zone_id == zones_table.c.id)
|
||||
|
||||
inner_q = select([table.c.id])
|
||||
if filtering_records:
|
||||
rzjoin = rzjoin.join(
|
||||
records_table,
|
||||
recordsets_table.c.id == records_table.c.recordset_id)
|
||||
|
||||
inner_q = select([recordsets_table.c.id]).select_from(rzjoin).\
|
||||
where(zones_table.c.deleted == '0')
|
||||
count_q = select([func.count(distinct(recordsets_table.c.id))]).\
|
||||
select_from(rzjoin).where(zones_table.c.deleted == '0')
|
||||
|
||||
if marker is not None:
|
||||
marker = utils.check_marker(table, marker, self.session)
|
||||
marker = utils.check_marker(recordsets_table, marker,
|
||||
self.session)
|
||||
|
||||
try:
|
||||
inner_q = utils.paginate_query(
|
||||
inner_q, table, limit,
|
||||
inner_q, recordsets_table, limit,
|
||||
[sort_key, 'id'], marker=marker,
|
||||
sort_dir=sort_dir)
|
||||
|
||||
@ -301,8 +311,27 @@ class SQLAlchemy(object):
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(six.text_type(value_error))
|
||||
|
||||
inner_q = self._apply_criterion(table, inner_q, criterion)
|
||||
inner_q = self._apply_deleted_criteria(context, table, inner_q)
|
||||
if apply_tenant_criteria:
|
||||
inner_q = self._apply_tenant_criteria(context, recordsets_table,
|
||||
inner_q)
|
||||
count_q = self._apply_tenant_criteria(context, recordsets_table,
|
||||
count_q)
|
||||
|
||||
inner_q = self._apply_criterion(recordsets_table, inner_q, criterion)
|
||||
count_q = self._apply_criterion(recordsets_table, count_q, criterion)
|
||||
|
||||
if filtering_records:
|
||||
records_criterion = dict((k, v) for k, v in (
|
||||
('data', data), ('status', status)) if v is not None)
|
||||
inner_q = self._apply_criterion(records_table, inner_q,
|
||||
records_criterion)
|
||||
count_q = self._apply_criterion(records_table, count_q,
|
||||
records_criterion)
|
||||
|
||||
inner_q = self._apply_deleted_criteria(context, recordsets_table,
|
||||
inner_q)
|
||||
count_q = self._apply_deleted_criteria(context, recordsets_table,
|
||||
count_q)
|
||||
|
||||
# Get the list of IDs needed.
|
||||
# This is a separate call due to
|
||||
@ -311,48 +340,57 @@ class SQLAlchemy(object):
|
||||
inner_rproxy = self.session.execute(inner_q)
|
||||
ids = inner_rproxy.fetchall()
|
||||
|
||||
resultproxy = self.session.execute(count_q)
|
||||
result = resultproxy.fetchone()
|
||||
total_count = 0 if result is None else result[0]
|
||||
|
||||
# formatted_ids = [id[0] for id in ids]
|
||||
formatted_ids = six.moves.map(operator.itemgetter(0), ids)
|
||||
|
||||
# Join the 2 required tables
|
||||
rjoin = recordsets_table.outerjoin(
|
||||
records_table,
|
||||
records_table.c.recordset_id == recordsets_table.c.id)
|
||||
|
||||
query = select(
|
||||
[
|
||||
# RS Info
|
||||
table.c.id, # 0 - RS ID
|
||||
table.c.version, # 1 - RS Version
|
||||
table.c.created_at, # 2 - RS Created
|
||||
table.c.updated_at, # 3 - RS Updated
|
||||
table.c.tenant_id, # 4 - RS Tenant
|
||||
table.c.zone_id, # 5 - RS Zone
|
||||
table.c.name, # 6 - RS Name
|
||||
table.c.type, # 7 - RS Type
|
||||
table.c.ttl, # 8 - RS TTL
|
||||
table.c.description, # 9 - RS Desc
|
||||
recordsets_table.c.id, # 0 - RS ID
|
||||
recordsets_table.c.version, # 1 - RS Version
|
||||
recordsets_table.c.created_at, # 2 - RS Created
|
||||
recordsets_table.c.updated_at, # 3 - RS Updated
|
||||
recordsets_table.c.tenant_id, # 4 - RS Tenant
|
||||
recordsets_table.c.zone_id, # 5 - RS Zone
|
||||
recordsets_table.c.name, # 6 - RS Name
|
||||
recordsets_table.c.type, # 7 - RS Type
|
||||
recordsets_table.c.ttl, # 8 - RS TTL
|
||||
recordsets_table.c.description, # 9 - RS Desc
|
||||
# R Info
|
||||
relation_table.c.id, # 10 - R ID
|
||||
relation_table.c.version, # 11 - R Version
|
||||
relation_table.c.created_at, # 12 - R Created
|
||||
relation_table.c.updated_at, # 13 - R Updated
|
||||
relation_table.c.tenant_id, # 14 - R Tenant
|
||||
relation_table.c.zone_id, # 15 - R Zone
|
||||
relation_table.c.recordset_id, # 16 - R RSet
|
||||
relation_table.c.data, # 17 - R Data
|
||||
relation_table.c.description, # 18 - R Desc
|
||||
relation_table.c.hash, # 19 - R Hash
|
||||
relation_table.c.managed, # 20 - R Mngd Flg
|
||||
relation_table.c.managed_plugin_name, # 21 - R Mngd Plg
|
||||
relation_table.c.managed_resource_type, # 22 - R Mngd Type
|
||||
relation_table.c.managed_resource_region, # 23 - R Mngd Rgn
|
||||
relation_table.c.managed_resource_id, # 24 - R Mngd ID
|
||||
relation_table.c.managed_tenant_id, # 25 - R Mngd T ID
|
||||
relation_table.c.status, # 26 - R Status
|
||||
relation_table.c.action, # 27 - R Action
|
||||
relation_table.c.serial # 28 - R Serial
|
||||
records_table.c.id, # 10 - R ID
|
||||
records_table.c.version, # 11 - R Version
|
||||
records_table.c.created_at, # 12 - R Created
|
||||
records_table.c.updated_at, # 13 - R Updated
|
||||
records_table.c.tenant_id, # 14 - R Tenant
|
||||
records_table.c.zone_id, # 15 - R Zone
|
||||
records_table.c.recordset_id, # 16 - R RSet
|
||||
records_table.c.data, # 17 - R Data
|
||||
records_table.c.description, # 18 - R Desc
|
||||
records_table.c.hash, # 19 - R Hash
|
||||
records_table.c.managed, # 20 - R Mngd Flg
|
||||
records_table.c.managed_plugin_name, # 21 - R Mngd Plg
|
||||
records_table.c.managed_resource_type, # 22 - R Mngd Type
|
||||
records_table.c.managed_resource_region, # 23 - R Mngd Rgn
|
||||
records_table.c.managed_resource_id, # 24 - R Mngd ID
|
||||
records_table.c.managed_tenant_id, # 25 - R Mngd T ID
|
||||
records_table.c.status, # 26 - R Status
|
||||
records_table.c.action, # 27 - R Action
|
||||
records_table.c.serial # 28 - R Serial
|
||||
]).\
|
||||
select_from(
|
||||
rjoin
|
||||
).\
|
||||
where(
|
||||
table.c.id.in_(formatted_ids)
|
||||
recordsets_table.c.id.in_(formatted_ids)
|
||||
)
|
||||
|
||||
# These make looking up indexes for the Raw Rows much easier,
|
||||
@ -393,7 +431,8 @@ class SQLAlchemy(object):
|
||||
"serial": 28,
|
||||
}
|
||||
|
||||
query, sort_dirs = utils.sort_query(query, table, [sort_key, 'id'],
|
||||
query, sort_dirs = utils.sort_query(query, recordsets_table,
|
||||
[sort_key, 'id'],
|
||||
sort_dir=sort_dir)
|
||||
|
||||
try:
|
||||
@ -406,7 +445,7 @@ class SQLAlchemy(object):
|
||||
except ValueError as value_error:
|
||||
raise exceptions.ValueError(six.text_type(value_error))
|
||||
|
||||
rrsets = list_cls()
|
||||
rrsets = objects.RecordSetList()
|
||||
rrset_id = None
|
||||
current_rrset = None
|
||||
|
||||
@ -417,7 +456,7 @@ class SQLAlchemy(object):
|
||||
# If this isn't the first iteration
|
||||
rrsets.append(current_rrset)
|
||||
# Set up a new rrset
|
||||
current_rrset = cls()
|
||||
current_rrset = objects.RecordSet()
|
||||
|
||||
rrset_id = record[rs_map['id']]
|
||||
|
||||
@ -426,10 +465,10 @@ class SQLAlchemy(object):
|
||||
for key, value in rs_map.items():
|
||||
setattr(current_rrset, key, record[value])
|
||||
|
||||
current_rrset.records = relation_list_cls()
|
||||
current_rrset.records = objects.RecordList()
|
||||
|
||||
if record[r_map['id']] is not None:
|
||||
rrdata = relation_cls()
|
||||
rrdata = objects.Record()
|
||||
|
||||
for key, value in r_map.items():
|
||||
setattr(rrdata, key, record[value])
|
||||
@ -439,7 +478,7 @@ class SQLAlchemy(object):
|
||||
else:
|
||||
# We've already got an rrset, add the rdata
|
||||
if record[r_map['id']] is not None:
|
||||
rrdata = relation_cls()
|
||||
rrdata = objects.Record()
|
||||
|
||||
for key, value in r_map.items():
|
||||
setattr(rrdata, key, record[value])
|
||||
@ -452,7 +491,7 @@ class SQLAlchemy(object):
|
||||
if current_rrset is not None:
|
||||
rrsets.append(current_rrset)
|
||||
|
||||
return rrsets
|
||||
return total_count, rrsets
|
||||
|
||||
def _update(self, context, table, obj, exc_dup, exc_notfound,
|
||||
skip_values=None):
|
||||
|
@ -630,15 +630,12 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
|
||||
recordsets.obj_reset_changes(['records'])
|
||||
|
||||
else:
|
||||
recordsets = self._find_recordsets_with_records(
|
||||
context, tables.recordsets, objects.RecordSet,
|
||||
objects.RecordSetList, exceptions.RecordSetNotFound, criterion,
|
||||
load_relations=True, relation_table=tables.records,
|
||||
relation_cls=objects.Record,
|
||||
relation_list_cls=objects.RecordList, limit=limit,
|
||||
marker=marker, sort_key=sort_key, sort_dir=sort_dir)
|
||||
tc, recordsets = self._find_recordsets_with_records(
|
||||
context, criterion, tables.zones, tables.recordsets,
|
||||
tables.records, limit=limit, marker=marker,
|
||||
sort_key=sort_key, sort_dir=sort_dir)
|
||||
|
||||
recordsets.total_count = self.count_recordsets(context, criterion)
|
||||
recordsets.total_count = tc
|
||||
|
||||
return recordsets
|
||||
|
||||
|
@ -86,6 +86,25 @@ class RecordsetTest(DesignateV2Test):
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertGreater(len(model.recordsets), 0)
|
||||
|
||||
def test_list_recordsets_with_filtering(self):
|
||||
# This test ensures the behavior in bug #1561746 won't happen
|
||||
post_model = datagen.random_a_recordset(self.zone.name,
|
||||
ip='192.168.1.2')
|
||||
self.useFixture(RecordsetFixture(self.zone.id, post_model))
|
||||
for i in range(1, 3):
|
||||
post_model = datagen.random_a_recordset(self.zone.name,
|
||||
ip='10.0.1.{}'.format(i))
|
||||
self.useFixture(RecordsetFixture(self.zone.id, post_model))
|
||||
|
||||
# Add limit in filter to make response paginated
|
||||
filters = {"data": "10.*", "limit": 2}
|
||||
resp, model = RecordsetClient.as_user('default') \
|
||||
.list_recordsets(self.zone.id, filters=filters)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(2, model.metadata.total_count)
|
||||
self.assertEqual(len(model.recordsets), 2)
|
||||
self.assertIsNotNone(model.links.next)
|
||||
|
||||
def assert_dns(self, model):
|
||||
results = dnsclient.query_servers(model.name, model.type)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user