Add validation for MX, TXT, and SSHFP records

Added validation to ensure that TXT records cannot end
with a '\' character.  Also added validation to ensure
that a signed zero (-0) cannot be used as an MX record
priority or SSHFP record fptype.

Change-Id: Id9bd12f42d7fb5fab9d7a352080f40eec3e9b6b7
Closes-Bug: 1533402
This commit is contained in:
Rahman Syed 2016-01-26 11:38:05 -06:00
parent 691a5f4c7b
commit eb3fe8ab59
8 changed files with 144 additions and 8 deletions

View File

@ -46,6 +46,9 @@ class MX(Record):
def _from_string(self, value): def _from_string(self, value):
priority, exchange = value.split(' ') priority, exchange = value.split(' ')
if repr(int(priority)) != priority:
raise ValueError('Value is not an integer')
self.priority = int(priority) self.priority = int(priority)
self.exchange = exchange self.exchange = exchange

View File

@ -53,6 +53,10 @@ class SSHFP(Record):
def _from_string(self, value): def _from_string(self, value):
algorithm, fp_type, fingerprint = value.split(' ') algorithm, fp_type, fingerprint = value.split(' ')
for value in {algorithm, fp_type}:
if repr(int(value)) != value:
raise ValueError('Value is not an integer')
self.algorithm = int(algorithm) self.algorithm = int(algorithm)
self.fp_type = int(fp_type) self.fp_type = int(fp_type)
self.fingerprint = fingerprint self.fingerprint = fingerprint

View File

@ -25,6 +25,7 @@ class TXT(Record):
'txt_data': { 'txt_data': {
'schema': { 'schema': {
'type': 'string', 'type': 'string',
'format': 'txt-data',
'maxLength': 255, 'maxLength': 255,
}, },
'required': True 'required': True

View File

@ -49,7 +49,7 @@ RE_FIP_ID = r'^(?P<region>[A-Za-z0-9\.\-_]{1,100}):' \
r'(?P<id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-' \ r'(?P<id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-' \
r'[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\Z' r'[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\Z'
RE_SSHFP = r'^[0-9A-Fa-f]{40}\Z' RE_SSHFP_FINGERPRINT = r'^[0-9A-Fa-f]{40}\Z'
draft3_format_checker = jsonschema.draft3_format_checker draft3_format_checker = jsonschema.draft3_format_checker
@ -141,6 +141,17 @@ def is_srv_hostname(instance):
return True return True
@draft4_format_checker.checks("txt-data")
def is_txt_data(instance):
if not isinstance(instance, compat.str_types):
return True
if instance.endswith('\\'):
return False
return True
@draft3_format_checker.checks("tld-name") @draft3_format_checker.checks("tld-name")
@draft4_format_checker.checks("tldname") @draft4_format_checker.checks("tldname")
def is_tldname(instance): def is_tldname(instance):
@ -172,14 +183,11 @@ def is_email(instance):
@draft4_format_checker.checks("sshfp") @draft4_format_checker.checks("sshfp")
def is_sshfp(instance): def is_sshfp_fingerprint(instance):
# TODO(kiall): This isn't actually validating an SSH FP, It's trying to
# validate *part* of a SSHFP, we should either rename this
# or actually validate a SSHFP rdata in it's entireity.
if not isinstance(instance, compat.str_types): if not isinstance(instance, compat.str_types):
return True return True
if not re.match(RE_SSHFP, instance): if not re.match(RE_SSHFP_FINGERPRINT, instance):
return False return False
return True return True

View File

View File

@ -0,0 +1,52 @@
# Copyright 2016 Rackspace
#
# Author: Rahman Syed <rahman.syed@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import oslotest.base
from designate import objects
from designate.exceptions import InvalidObject
LOG = logging.getLogger(__name__)
def debug(*a, **kw):
for v in a:
LOG.debug(repr(v))
for k in sorted(kw):
LOG.debug("%s: %s", k, repr(kw[k]))
class MXRecordTest(oslotest.base.BaseTestCase):
def test_parse_mx(self):
mx_record = objects.MX()
mx_record._from_string('0 mail.example.org.')
self.assertEqual(0, mx_record.priority)
self.assertEqual('mail.example.org.', mx_record.exchange)
def test_validate_mx_signed_zero(self):
rs = objects.RecordSet(
name='www.example.org.', type='MX',
records=objects.RecordList(objects=[
objects.Record(data='-0 mail.example.org.',
status='ACTIVE'),
])
)
self.assertRaises(InvalidObject, rs.validate)

View File

@ -0,0 +1,68 @@
# Copyright 2016 Rackspace
#
# Author: Rahman Syed <rahman.syed@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import oslotest.base
from designate import objects
from designate.exceptions import InvalidObject
LOG = logging.getLogger(__name__)
def debug(*a, **kw):
for v in a:
LOG.debug(repr(v))
for k in sorted(kw):
LOG.debug("%s: %s", k, repr(kw[k]))
class SSHFPecordTest(oslotest.base.BaseTestCase):
def test_parse_sshfp(self):
sshfp_record = objects.SSHFP()
sshfp_record._from_string(
'0 0 72d30d211ce8c464de2811e534de23b9be9b4dc4')
self.assertEqual(0, sshfp_record.algorithm)
self.assertEqual(0, sshfp_record.fp_type)
self.assertEqual('72d30d211ce8c464de2811e534de23b9be9b4dc4',
sshfp_record.fingerprint)
def test_validate_sshfp_signed_zero_alg(self):
rs = objects.RecordSet(
name='www.example.org.', type='SSHFP',
records=objects.RecordList(objects=[
objects.Record(
data='-0 0 72d30d211ce8c464de2811e534de23b9be9b4dc4',
status='ACTIVE'),
])
)
self.assertRaises(InvalidObject, rs.validate)
def test_validate_sshfp_signed_zero_fptype(self):
rs = objects.RecordSet(
name='www.example.org.', type='SSHFP',
records=objects.RecordList(objects=[
objects.Record(
data='0 -0 72d30d211ce8c464de2811e534de23b9be9b4dc4',
status='ACTIVE'),
])
)
self.assertRaises(InvalidObject, rs.validate)

View File

@ -332,11 +332,11 @@ class SchemaFormatTest(TestCase):
] ]
for sshfp in valid_sshfps: for sshfp in valid_sshfps:
self.assertTrue(format.is_sshfp(sshfp), self.assertTrue(format.is_sshfp_fingerprint(sshfp),
'Expected Valid: %s' % sshfp) 'Expected Valid: %s' % sshfp)
for sshfp in invalid_sshfps: for sshfp in invalid_sshfps:
self.assertFalse(format.is_sshfp(sshfp), self.assertFalse(format.is_sshfp_fingerprint(sshfp),
'Expected Invalid: %s' % sshfp) 'Expected Invalid: %s' % sshfp)
def test_is_uuid(self): def test_is_uuid(self):