Add functional tests that query for wildcard records

Change-Id: I61131b4b60562b7cd0812e8f542f587165e04380
This commit is contained in:
Paul Glass 2015-08-27 21:23:13 +00:00
parent 7ac4cb4759
commit f27e662cbe

@ -44,25 +44,25 @@ RECORDSETS_DATASET = {
}
WILDCARD_RECORDSETS_DATASET = {
'wildcard_A': dict(make_recordset=lambda z:
'A': dict(make_recordset=lambda z:
datagen.random_a_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_AAAA': dict(make_recordset=lambda z:
'AAAA': dict(make_recordset=lambda z:
datagen.random_aaaa_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_CNAME': dict(make_recordset=lambda z:
'CNAME': dict(make_recordset=lambda z:
datagen.random_cname_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_MX': dict(make_recordset=lambda z:
'MX': dict(make_recordset=lambda z:
datagen.random_mx_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_SPF': dict(make_recordset=lambda z:
'SPF': dict(make_recordset=lambda z:
datagen.random_spf_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_SSHFP': dict(make_recordset=lambda z:
'SSHFP': dict(make_recordset=lambda z:
datagen.random_sshfp_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
'wildcard_TXT': dict(make_recordset=lambda z:
'TXT': dict(make_recordset=lambda z:
datagen.random_txt_recordset(zone_name=z.name,
name="*.{0}".format(z.name))),
}
@ -106,9 +106,7 @@ class RecordsetTest(DesignateV2Test):
self.assertEqual(model_data, data)
@utils.parameterized(
dict(RECORDSETS_DATASET.items() + WILDCARD_RECORDSETS_DATASET.items())
)
@utils.parameterized(RECORDSETS_DATASET)
def test_crud_recordset(self, make_recordset):
post_model = make_recordset(self.zone)
@ -183,6 +181,27 @@ class RecordsetTest(DesignateV2Test):
exceptions.BadRequest, 'invalid_object', 400,
client.put_recordset, self.zone.id, recordset_id, model)
@utils.parameterized(WILDCARD_RECORDSETS_DATASET)
def test_can_create_and_query_wildcard_recordset(self, make_recordset):
client = RecordsetClient.as_user('default')
post_model = make_recordset(self.zone)
resp, post_resp_model = client.post_recordset(self.zone.id, post_model)
self.assertEqual(resp.status, 202)
recordset_id = post_resp_model.id
client.wait_for_recordset(self.zone.id, recordset_id)
verify_models = [
post_model.from_dict(post_model.to_dict()) for x in range(3)
]
verify_models[0].name = "abc.{0}".format(self.zone.name)
verify_models[1].name = "abc.def.{0}".format(self.zone.name)
verify_models[2].name = "abc.def.hij.{0}".format(self.zone.name)
for m in verify_models:
self.assert_dns(m)
class RecordsetOwnershipTest(DesignateV2Test):