diff --git a/functionaltests/api/v2/test_recordset.py b/functionaltests/api/v2/test_recordset.py index 3e0f4a82..d335d739 100644 --- a/functionaltests/api/v2/test_recordset.py +++ b/functionaltests/api/v2/test_recordset.py @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -from tempest_lib.exceptions import RestClientException +from tempest_lib import exceptions from functionaltests.common import datagen from functionaltests.common import utils @@ -47,9 +47,19 @@ class RecordsetTest(DesignateV2Test): make_recordset=lambda z: datagen.random_cname_recordset(z.name)), 'MX': dict( make_recordset=lambda z: datagen.random_mx_recordset(z.name)), + 'SPF': dict( + make_recordset=lambda z: datagen.random_spf_recordset(z.name)), + 'SRV': dict( + make_recordset=lambda z: datagen.random_srv_recordset(z.name)), + 'SSHFP': dict( + make_recordset=lambda z: datagen.random_sshfp_recordset(z.name)), + 'TXT': dict( + make_recordset=lambda z: datagen.random_txt_recordset(z.name)), + }) def test_crud_recordset(self, make_recordset): post_model = make_recordset(self.zone) + resp, post_resp_model = RecordsetClient.as_user('default') \ .post_recordset(self.zone.id, post_model) self.assertEqual(resp.status, 202, "on post response") @@ -81,6 +91,75 @@ class RecordsetTest(DesignateV2Test): RecordsetClient.as_user('default').wait_for_404( self.zone.id, recordset_id) + @utils.parameterized({ + 'A': dict( + make_recordset=lambda z: datagen.random_a_recordset(z.name)), + 'AAAA': dict( + make_recordset=lambda z: datagen.random_aaaa_recordset(z.name)), + 'CNAME': dict( + make_recordset=lambda z: datagen.random_cname_recordset(z.name)), + 'MX': dict( + make_recordset=lambda z: datagen.random_mx_recordset(z.name)), + 'SPF': dict( + make_recordset=lambda z: datagen.random_spf_recordset(z.name)), + 'SRV': dict( + make_recordset=lambda z: datagen.random_srv_recordset(z.name)), + 'SSHFP': dict( + make_recordset=lambda z: datagen.random_sshfp_recordset(z.name)), + 'TXT': dict( + make_recordset=lambda z: datagen.random_txt_recordset(z.name)), + }) + def test_create_invalid(self, make_recordset, data=None): + data = data or ["b0rk"] + + client = RecordsetClient.as_user('default') + + for i in data: + model = make_recordset(self.zone) + model.data = i + self._assert_exception( + exceptions.BadRequest, 'invalid_object', 400, + client.post_recordset, self.zone.id, model) + + @utils.parameterized({ + 'A': dict( + make_recordset=lambda z: datagen.random_a_recordset(z.name)), + 'AAAA': dict( + make_recordset=lambda z: datagen.random_aaaa_recordset(z.name)), + 'CNAME': dict( + make_recordset=lambda z: datagen.random_cname_recordset(z.name)), + 'MX': dict( + make_recordset=lambda z: datagen.random_mx_recordset(z.name)), + 'SPF': dict( + make_recordset=lambda z: datagen.random_spf_recordset(z.name)), + 'SRV': dict( + make_recordset=lambda z: datagen.random_srv_recordset(z.name)), + 'SSHFP': dict( + make_recordset=lambda z: datagen.random_sshfp_recordset(z.name)), + 'TXT': dict( + make_recordset=lambda z: datagen.random_txt_recordset(z.name)), + }) + def test_update_invalid(self, make_recordset, data=None): + data = data or ["b0rk"] + + post_model = make_recordset(self.zone) + + client = RecordsetClient.as_user('default') + resp, post_resp_model = client.post_recordset( + self.zone.id, post_model) + + recordset_id = post_resp_model.id + + client.wait_for_recordset( + self.zone.id, recordset_id) + + for i in data: + model = make_recordset(self.zone) + model.data = i + self._assert_exception( + exceptions.BadRequest, 'invalid_object', 400, + client.put_recordset, self.zone.id, recordset_id, model) + class RecordsetOwnershipTest(DesignateV2Test): @@ -95,13 +174,13 @@ class RecordsetOwnershipTest(DesignateV2Test): # try with name=A123456.zone.com. recordset = datagen.random_a_recordset(zone_name=zone.name) - self.assertRaises(RestClientException, + self.assertRaises(exceptions.RestClientException, lambda: RecordsetClient.as_user('alt') .post_recordset(zone.id, recordset)) # try with name=zone.com. recordset.name = zone.name - self.assertRaises(RestClientException, + self.assertRaises(exceptions.RestClientException, lambda: RecordsetClient.as_user('alt') .post_recordset(zone.id, recordset)) @@ -114,7 +193,7 @@ class RecordsetOwnershipTest(DesignateV2Test): zone_data.name = 'a.b.c.' + zone_data.name resp, zone = ZoneClient.as_user('default').post_zone(zone_data) - self.assertRaises(RestClientException, + self.assertRaises(exceptions.RestClientException, lambda: RecordsetClient.as_user('alt') .post_recordset(zone.id, recordset)) @@ -126,9 +205,9 @@ class RecordsetOwnershipTest(DesignateV2Test): # alt attempts to create record with name A12345.{zone} recordset = datagen.random_a_recordset(zone_name=zone.name) - self.assertRaises(RestClientException, + self.assertRaises(exceptions.RestClientException, lambda: RecordsetClient.as_user('alt') .post_recordset(zone.id, recordset)) - self.assertRaises(RestClientException, + self.assertRaises(exceptions.RestClientException, lambda: RecordsetClient.as_user('alt') .post_recordset(alt_zone.id, recordset)) diff --git a/functionaltests/common/datagen.py b/functionaltests/common/datagen.py index 10fe32d9..9551cadd 100644 --- a/functionaltests/common/datagen.py +++ b/functionaltests/common/datagen.py @@ -143,3 +143,31 @@ def random_zonefile_data(name=None, ttl=None): ttl = str(random.randint(1200, 8400)) return zone_base.replace('&', name).replace('#', ttl) + + +def random_spf_recordset(zone_name, data=None): + data = data or "v=spf1 +all" + return random_recordset_data('SPF', zone_name, records=[data]) + + +def random_srv_recordset(zone_name, data=None): + data = data or "10 0 8080 %s.%s" % (random_string(), zone_name) + return random_recordset_data('SRV', zone_name, + name="_sip._tcp.%s" % zone_name, + records=[data]) + + +def random_sshfp_recordset(zone_name, algorithm_number=None, + fingerprint_type=None, fingerprint=None): + algorithm_number = algorithm_number or 2 + fingerprint_type = fingerprint_type or 1 + fingerprint = fingerprint or \ + "123456789abcdef67890123456789abcdef67890" + + data = "%s %s %s" % (algorithm_number, fingerprint_type, fingerprint) + return random_recordset_data('SSHFP', zone_name, records=[data]) + + +def random_txt_recordset(zone_name, data=None): + data = data or "v=spf1 +all" + return random_recordset_data('TXT', zone_name, records=[data])