Cleaned up and fixed record objects and tests

We are currently raising the wrong exceptions for a few record
types. This can cause only part of the failures to get raised
to the user, since the first exception will fail the process.
Ideally we want to inform the user of all invalid records.

This patch fixes this and cleans up some of code as well.
- Cleaned up objects to use consistent exceptions.
- Fixed all broken tests.
- Re-named various objects tests.

Change-Id: I3d54a47f73929377122249c4d971e70e553725a6
This commit is contained in:
Erik Olof Gunnar Andersson 2022-07-14 17:06:09 -07:00
parent 3423a3e656
commit d10056178a
12 changed files with 229 additions and 89 deletions

@ -16,12 +16,21 @@
import base64
from designate.exceptions import InvalidObject
from designate.objects import base
from designate.objects import fields
from designate.objects.record import Record
from designate.objects.record import RecordList
VALID_ALGOS = [
'RSAMD5', 'DSA', 'RSASHA1', 'DSA-NSEC3-SHA1', 'RSASHA1-NSEC3-SHA1',
'RSASHA256', 'RSASHA512', 'ECC-GOST', 'ECDSAP256SHA256', 'ECDSAP384SHA384',
'ED25519', 'ED448'
]
VALID_CERTS = [
'PKIX', 'SPKI', 'PGP', 'IPKIX', 'ISPKI', 'IPGP', 'ACPKIX', 'IACPKIX',
'URI', 'OID', 'DPKIX', 'DPTR'
]
@base.DesignateRegistry.register
class CERT(Record):
@ -36,39 +45,42 @@ class CERT(Record):
'certificate': fields.StringFields(),
}
def validate_cert_type(self, cert_type):
@staticmethod
def validate_cert_type(cert_type):
if cert_type in VALID_CERTS:
return cert_type
try:
int_cert_type = int(cert_type)
if int_cert_type < 0 or int_cert_type > 65535:
err = ("Cert type value should be between 0 and 65535")
raise InvalidObject(err)
except ValueError:
# cert type is specified as Mnemonic
VALID_CERTS = ['PKIX', 'SPKI', 'PGP', 'IPKIX', 'ISPKI', 'IPGP',
'ACPKIX', 'IACPKIX', 'URI', 'OID', 'DPKIX', 'DPTR']
if cert_type not in VALID_CERTS:
err = ("Cert type is not valid Mnemonic.")
raise InvalidObject(err)
raise ValueError('Cert type is not valid Mnemonic.')
if int_cert_type < 0 or int_cert_type > 65535:
raise ValueError(
'Cert type value should be between 0 and 65535'
)
return cert_type
def validate_cert_algo(self, cert_algo):
@staticmethod
def validate_cert_algo(cert_algo):
if cert_algo in VALID_ALGOS:
return cert_algo
try:
int_cert_algo = int(cert_algo)
if int_cert_algo < 0 or int_cert_algo > 255:
err = ("Cert algorithm value should be between 0 and 255")
raise InvalidObject(err)
except ValueError:
# cert algo is specified as Mnemonic
VALID_ALGOS = ['RSAMD5', 'DSA', 'RSASHA1', 'DSA-NSEC3-SHA1',
'RSASHA1-NSEC3-SHA1', 'RSASHA256', 'RSASHA512',
'ECC-GOST', 'ECDSAP256SHA256', 'ECDSAP384SHA384',
'ED25519', 'ED448']
if cert_algo not in VALID_ALGOS:
err = ("Cert algorithm is not valid Mnemonic.")
raise InvalidObject(err)
raise ValueError('Cert algorithm is not valid Mnemonic.')
if int_cert_algo < 0 or int_cert_algo > 255:
raise ValueError(
'Cert algorithm value should be between 0 and 255'
)
return cert_algo
def validate_cert_certificate(self, certificate):
@staticmethod
def validate_cert_certificate(certificate):
try:
chunks = certificate.split(' ')
encoded_chunks = []
@ -77,13 +89,11 @@ class CERT(Record):
b64 = b''.join(encoded_chunks)
base64.b64decode(b64)
except Exception:
err = ("Cert certificate is not valid.")
raise InvalidObject(err)
raise ValueError('Cert certificate is not valid.')
return certificate
def _to_string(self):
return ("%(cert_type)s %(key_tag)s %(cert_algo)s "
"%(certificate)s" % self)
return '%(cert_type)s %(key_tag)s %(cert_algo)s %(certificate)s' % self
def _from_string(self, v):
cert_type, key_tag, cert_algo, certificate = v.split(' ', 3)

@ -12,7 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.exceptions import InvalidObject
from designate.objects import base
from designate.objects import fields
from designate.objects.record import Record
@ -33,22 +32,23 @@ class SPF(Record):
return self.txt_data
def _from_string(self, value):
if (not value.startswith('"') and not value.endswith('"')):
if not value.startswith('"') and not value.endswith('"'):
# value with spaces should be quoted as per RFC1035 5.1
for element in value:
if element.isspace():
err = ("Empty spaces are not allowed in SPF record, "
"unless wrapped in double quotes.")
raise InvalidObject(err)
raise ValueError(
'Empty spaces are not allowed in SPF record, '
'unless wrapped in double quotes.'
)
else:
# quotes within value should be escaped with backslash
strip_value = value.strip('"')
for index, char in enumerate(strip_value):
if char == '"':
if strip_value[index - 1] != "\\":
err = ("Quotation marks should be escaped with "
"backslash.")
raise InvalidObject(err)
raise ValueError(
'Quotation marks should be escaped with backslash.'
)
self.txt_data = value

@ -12,7 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.exceptions import InvalidObject
from designate.objects import base
from designate.objects import fields
from designate.objects.record import Record
@ -43,31 +42,34 @@ class TXT(Record):
def _validate_record_single_string(self, value):
if len(value) > 255:
err = ("Any TXT record string exceeding "
"255 characters has to be split.")
raise InvalidObject(err)
raise ValueError(
'Any TXT record string exceeding 255 characters has to be '
'split.'
)
if self._is_missing_double_quote(value):
err = ("TXT record is missing a double quote either at beginning "
"or at end.")
raise InvalidObject(err)
raise ValueError(
'TXT record is missing a double quote either at beginning '
'or at end.'
)
if not self._is_wrapped_in_double_quotes(value):
# value with spaces should be quoted as per RFC1035 5.1
for element in value:
if element.isspace():
err = ("Empty spaces are not allowed in TXT record, "
"unless wrapped in double quotes.")
raise InvalidObject(err)
raise ValueError(
'Empty spaces are not allowed in TXT record, '
'unless wrapped in double quotes.'
)
else:
# quotes within value should be escaped with backslash
strip_value = value.strip('"')
for index, char in enumerate(strip_value):
if char == '"':
if strip_value[index - 1] != "\\":
err = ("Quotation marks should be escaped with "
"backslash.")
raise InvalidObject(err)
raise ValueError(
'Quotation marks should be escaped with backslash.'
)
def _from_string(self, value):
if len(value) > 255:
@ -76,10 +78,10 @@ class TXT(Record):
stripped_value = value.strip('"')
if (not self._is_wrapped_in_double_quotes(value) and
'" "' not in stripped_value):
err = ("TXT record strings over 255 characters "
"have to be split into multiple strings "
"wrapped in double quotes.")
raise InvalidObject(err)
raise ValueError(
'TXT record strings over 255 characters have to be split '
'into multiple strings wrapped in double quotes.'
)
record_strings = stripped_value.split('" "')
for record_string in record_strings:

@ -23,10 +23,50 @@ LOG = logging.getLogger(__name__)
class RRDataATest(oslotest.base.BaseTestCase):
def test_reject_leading_zeros(self):
record = objects.A(data='10.0.001.1')
def test_valid_a_record(self):
recordset = objects.RecordSet(
name='www.example.test.', type='A',
records=objects.RecordList(objects=[
objects.Record(data='192.168.0.1'),
])
)
recordset.validate()
def test_reject_aaaa_record(self):
recordset = objects.RecordSet(
name='www.example.test.', type='A',
records=objects.RecordList(objects=[
objects.Record(data='2001:db8:0:1::1'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)
def test_reject_invalid_data(self):
recordset = objects.RecordSet(
name='www.example.test.', type='A',
records=objects.RecordList(objects=[
objects.Record(data='TXT'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)
def test_reject_leading_zeros(self):
recordset = objects.RecordSet(
name='www.example.test.', type='A',
records=objects.RecordList(objects=[
objects.Record(data='10.0.001.1'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import oslotest.base
from designate import exceptions
from designate import objects
LOG = logging.getLogger(__name__)
class RRDataAAAATest(oslotest.base.BaseTestCase):
def test_valid_aaaa_record(self):
recordset = objects.RecordSet(
name='www.example.test.', type='AAAA',
records=objects.RecordList(objects=[
objects.Record(data='2001:db8:0:1::1'),
])
)
recordset.validate()
def test_reject_a_record(self):
recordset = objects.RecordSet(
name='www.example.test.', type='AAAA',
records=objects.RecordList(objects=[
objects.Record(data='192.168.0.1'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)
def test_reject_invalid_data(self):
recordset = objects.RecordSet(
name='www.example.test.', type='AAAA',
records=objects.RecordList(objects=[
objects.Record(data='TXT'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)

@ -21,7 +21,7 @@ from designate import objects
LOG = logging.getLogger(__name__)
class CAARecordTest(oslotest.base.BaseTestCase):
class RRDataCAATest(oslotest.base.BaseTestCase):
def test_parse_caa_issue(self):
caa_record = objects.CAA()
caa_record._from_string('0 issue ca.example.net')

@ -17,28 +17,30 @@
from oslo_log import log as logging
import oslotest.base
from designate import exceptions
from designate import objects
LOG = logging.getLogger(__name__)
class CERTRecordTest(oslotest.base.BaseTestCase):
class RRDataCERTTest(oslotest.base.BaseTestCase):
def test_parse_cert(self):
cert_record = objects.CERT()
cert_record._from_string(
'DPKIX 1 RSASHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc=') # noqa
'DPKIX 1 RSASHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc='
)
self.assertEqual('DPKIX', cert_record.cert_type)
self.assertEqual(1, cert_record.key_tag)
self.assertEqual('RSASHA256', cert_record.cert_algo)
self.assertEqual('KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc=',
cert_record.certificate)
self.assertEqual(
'KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc=',
cert_record.certificate
)
def test_parse_invalid_cert_type_value(self):
cert_record = objects.CERT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
'Cert type value should be between 0 and 65535',
cert_record._from_string,
'99999 1 RSASHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc='
@ -47,7 +49,7 @@ class CERTRecordTest(oslotest.base.BaseTestCase):
def test_parse_invalid_cert_type_mnemonic(self):
cert_record = objects.CERT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
'Cert type is not valid Mnemonic.',
cert_record._from_string,
'FAKETYPE 1 RSASHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc='
@ -56,7 +58,7 @@ class CERTRecordTest(oslotest.base.BaseTestCase):
def test_parse_invalid_cert_algo_value(self):
cert_record = objects.CERT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
'Cert algorithm value should be between 0 and 255',
cert_record._from_string,
'DPKIX 1 256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc='
@ -65,7 +67,7 @@ class CERTRecordTest(oslotest.base.BaseTestCase):
def test_parse_invalid_cert_algo_mnemonic(self):
cert_record = objects.CERT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
'Cert algorithm is not valid Mnemonic.',
cert_record._from_string,
'DPKIX 1 FAKESHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc='
@ -74,7 +76,7 @@ class CERTRecordTest(oslotest.base.BaseTestCase):
def test_parse_invalid_cert_certificate(self):
cert_record = objects.CERT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
'Cert certificate is not valid.',
cert_record._from_string,
'DPKIX 1 RSASHA256 KR1L0GbocaIOOim1+qdHtOSrDcOsGiI2NCcxuX2/Tqc'

@ -22,7 +22,7 @@ from designate import objects
LOG = logging.getLogger(__name__)
class MXRecordTest(oslotest.base.BaseTestCase):
class RRDataMXTest(oslotest.base.BaseTestCase):
def test_parse_mx(self):
mx_record = objects.MX()
mx_record._from_string('0 mail.example.org.')

@ -21,7 +21,7 @@ from designate import objects
LOG = logging.getLogger(__name__)
class NAPTRRecordTest(oslotest.base.BaseTestCase):
class RRDataNAPTRTest(oslotest.base.BaseTestCase):
def test_parse_naptr(self):
naptr_record = objects.NAPTR()
naptr_record._from_string(

@ -20,17 +20,27 @@ LOG = logging.getLogger(__name__)
class RRDataSPFTest(oslotest.base.BaseTestCase):
def test_reject_non_quoted_spaces(self):
record = objects.SPF(data='foo bar')
recordset = objects.RecordSet(
name='www.example.test.', type='SPF',
records=objects.RecordList(objects=[
objects.Record(data='foo bar'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)
def test_reject_non_escaped_quotes(self):
record = objects.SPF(data='foo"bar')
recordset = objects.RecordSet(
name='www.example.test.', type='SPF',
records=objects.RecordList(objects=[
objects.Record(data='"foo"bar"'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)

@ -16,13 +16,13 @@
from oslo_log import log as logging
import oslotest.base
from designate.exceptions import InvalidObject
from designate import exceptions
from designate import objects
LOG = logging.getLogger(__name__)
class SSHFPecordTest(oslotest.base.BaseTestCase):
class RRDataSSHTPTest(oslotest.base.BaseTestCase):
def test_parse_sshfp(self):
sshfp_record = objects.SSHFP()
sshfp_record._from_string(
@ -34,7 +34,7 @@ class SSHFPecordTest(oslotest.base.BaseTestCase):
sshfp_record.fingerprint)
def test_validate_sshfp_signed_zero_alg(self):
record_set = objects.RecordSet(
recordset = objects.RecordSet(
name='www.example.org.', type='SSHFP',
records=objects.RecordList(objects=[
objects.Record(
@ -42,11 +42,14 @@ class SSHFPecordTest(oslotest.base.BaseTestCase):
status='ACTIVE'),
])
)
self.assertRaises(InvalidObject, record_set.validate)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)
def test_validate_sshfp_signed_zero_fptype(self):
record_set = objects.RecordSet(
recordset = objects.RecordSet(
name='www.example.org.', type='SSHFP',
records=objects.RecordList(objects=[
objects.Record(
@ -54,5 +57,8 @@ class SSHFPecordTest(oslotest.base.BaseTestCase):
status='ACTIVE'),
])
)
self.assertRaises(InvalidObject, record_set.validate)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
recordset.validate
)

@ -20,42 +20,57 @@ LOG = logging.getLogger(__name__)
class RRDataTXTTest(oslotest.base.BaseTestCase):
def test_reject_non_quoted_spaces(self):
record = objects.TXT(data='foo bar')
recordset = objects.RecordSet(
name='www.example.test.', type='TXT',
records=objects.RecordList(objects=[
objects.Record(data='foo bar'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)
def test_reject_non_escaped_quotes(self):
record = objects.TXT(data='foo"bar')
recordset = objects.RecordSet(
name='www.example.test.', type='TXT',
records=objects.RecordList(objects=[
objects.Record(data='"foo"bar"'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)
def test_multiple_strings_one_record(self):
# these quotes do not have to be escaped as
# per rfc7208 3.3 and rfc1035 3.3.14
record = objects.TXT(data='"foo" "bar"')
recordset = objects.RecordSet(
name='www.example.test.', type='TXT',
records=objects.RecordList(objects=[
objects.Record(data='"foo" "bar"'),
])
)
self.assertRaisesRegex(
exceptions.InvalidObject,
'Provided object does not match schema',
record.validate
recordset.validate
)
def test_reject_non_matched_quotes(self):
record = objects.TXT()
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
"TXT record is missing a double quote either at beginning "
"or at end.",
record._from_string,
'"foo'
)
self.assertRaisesRegex(
exceptions.InvalidObject,
ValueError,
"TXT record is missing a double quote either at beginning "
"or at end.",
record._from_string,