Forbid IPv4 DNS in an IPv6 OAM config

Implemented IP version check in DNS controller api to reject patch
operations with mismatched DNS server IP version.

Enabled and fixed relevant unit tests.

Rearranged unit test inheritance hierachy to eliminate undesired test
repetitions.

Closes-Bug: 1860489

Change-Id: Ief4a19eeea03086bb5816a13cb3a706a48bab51a
Signed-off-by: Thomas Gao <Thomas.Gao@windriver.com>
This commit is contained in:
Thomas Gao 2020-02-03 15:41:28 -05:00
parent 8ab1e2d7c6
commit c4fa36214c
2 changed files with 66 additions and 77 deletions

View File

@ -143,7 +143,7 @@ class DNSCollection(collection.Collection):
##############
# UTILS
##############
def _check_dns_data(dns):
def _check_dns_data(dns, ip_family):
# Get data
nameservers = dns['nameservers']
idns_nameservers_list = []
@ -157,20 +157,25 @@ def _check_dns_data(dns):
ntp_list = pecan.request.dbapi.intp_get_by_isystem(dns['isystem_uuid'])
if nameservers:
for nameservers in [n.strip() for n in nameservers.split(',')]:
for nameserver in [n.strip() for n in nameservers.split(',')]:
# Semantic check each server as IP
try:
idns_nameservers_list.append(str(IPAddress(nameservers)))
idns_nameservers_list.append(str(IPAddress(nameserver)))
if ip_family and IPAddress(nameserver).version != ip_family:
raise wsme.exc.ClientSideError(_(
"IP version mismatch: was expecting "
"IPv%d, IPv%d received") % (ip_family,
IPAddress(nameserver).version))
except (AddrFormatError, ValueError):
if nameservers == 'NC':
if nameserver == 'NC':
idns_nameservers_list.append(str(""))
break
raise wsme.exc.ClientSideError(_(
"Invalid DNS nameserver target address %s "
"Please configure a valid DNS "
"address.") % (nameservers))
"address.") % (nameserver))
if len(idns_nameservers_list) == 0 or idns_nameservers_list == [""]:
if ntp_list:
@ -336,8 +341,16 @@ class DNSController(rest.RestController):
except utils.JSONPATCH_EXCEPTIONS as e:
raise exception.PatchError(patch=patch, reason=e)
LOG.warn("dns %s" % dns.as_dict())
dns = _check_dns_data(dns.as_dict())
# Since dns requests on the controller go over the oam network,
# check the ip version of the oam address pool in the database
oam_network = pecan.request.dbapi.network_get_by_type(
constants.NETWORK_TYPE_OAM)
oam_address_pool = pecan.request.dbapi.address_pool_get(
oam_network.pool_uuid)
ip_family = oam_address_pool.family
LOG.info("dns %s; ip_family: ipv%d" % (dns.as_dict(), ip_family))
dns = _check_dns_data(dns.as_dict(), ip_family)
try:
# Update only the fields that have changed

View File

@ -9,7 +9,6 @@ Tests for the API / dns / methods.
"""
import mock
import unittest
from six.moves import http_client
from sysinv.tests.api import base
from sysinv.tests.db import base as dbbase
@ -123,7 +122,6 @@ class ApiDNSPatchTestSuiteMixin(ApiDNSTestCaseMixin):
def setUp(self):
super(ApiDNSPatchTestSuiteMixin, self).setUp()
self.patch_object = self._create_db_object()
if(self.is_ipv4):
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
@ -135,6 +133,7 @@ class ApiDNSPatchTestSuiteMixin(ApiDNSTestCaseMixin):
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
self.patch_object = self._create_db_object()
def exception_dns(self):
print('Raised a fake exception')
@ -282,12 +281,32 @@ class ApiDNSListTestSuiteMixin(ApiDNSTestCaseMixin):
self.assertEqual(response[self.RESULT_KEY][0]['uuid'], self.dns_uuid)
# ============= IPv4 environment tests ==============
# Tests DNS Api operations for a Controller (defaults to IPv4)
class PlatformIPv4ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
def test_patch_ip_version_mismatch(self):
self.is_ipv4 = True
self.patch_object = self._create_db_object()
self.patch_value_no_change = '2001:4860:4860::8888,2001:4860:4860::8844'
self.patch_value_changed = '2001:4860:4860::8888'
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
expected_msg = "IP version mismatch: was expecting IPv4, IPv6 received"
self.assertIn(expected_msg, response.json['error_message'])
class PlatformIPv4ControllerApiDNSListTestCase(ApiDNSListTestSuiteMixin,
@ -314,7 +333,28 @@ class PlatformIPv6ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
dbbase.BaseIPv6Mixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
def test_patch_ip_version_mismatch(self):
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
self.patch_value_more_than_permitted = '8.8.8.8,8.8.4.4,9.9.9.9,9.8.8.9'
self.patch_value_hostname = "dns.google"
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
expected_msg = "IP version mismatch: was expecting IPv6, IPv4 received"
self.assertIn(expected_msg, response.json['error_message'])
class PlatformIPv6ControllerApiDNSListTestCase(ApiDNSListTestSuiteMixin,
@ -336,67 +376,3 @@ class PlatformIPv6ControllerApiDNSDeleteTestCase(ApiDNSDeleteTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
# ============= IPv6 DNS in IPv4 environment tests ==============
class PlatformIPv6inIPv4OAMControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
def setUp(self):
super(PlatformIPv6inIPv4OAMControllerApiDNSPatchTestCase, self).setUp()
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '2001:4860:4860::8888,2001:4860:4860::8844'
self.patch_value_changed = '2001:4860:4860::8888'
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
# See https://bugs.launchpad.net/starlingx/+bug/1860489
@unittest.expectedFailure
def test_patch_valid_change(self):
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
pass
# ============= IPv4 DNS in IPv6 environment tests ==============
class PlatformIPv4inIPv6ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
dbbase.BaseIPv6Mixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
def setUp(self):
super(PlatformIPv4inIPv6ControllerApiDNSPatchTestCase, self).setUp()
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
self.patch_value_more_than_permitted = '8.8.8.8,8.8.4.4,9.9.9.9,9.8.8.9'
self.patch_value_hostname = "dns.google"
# See https://bugs.launchpad.net/starlingx/+bug/1860489
@unittest.expectedFailure
def test_patch_valid_change(self):
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
pass