Merge "Forbid IPv4 DNS in an IPv6 OAM config"

This commit is contained in:
Zuul 2020-02-06 20:45:14 +00:00 committed by Gerrit Code Review
commit 2a6ecd8a4a
2 changed files with 66 additions and 77 deletions

View File

@ -143,7 +143,7 @@ class DNSCollection(collection.Collection):
##############
# UTILS
##############
def _check_dns_data(dns):
def _check_dns_data(dns, ip_family):
# Get data
nameservers = dns['nameservers']
idns_nameservers_list = []
@ -157,20 +157,25 @@ def _check_dns_data(dns):
ntp_list = pecan.request.dbapi.intp_get_by_isystem(dns['isystem_uuid'])
if nameservers:
for nameservers in [n.strip() for n in nameservers.split(',')]:
for nameserver in [n.strip() for n in nameservers.split(',')]:
# Semantic check each server as IP
try:
idns_nameservers_list.append(str(IPAddress(nameservers)))
idns_nameservers_list.append(str(IPAddress(nameserver)))
if ip_family and IPAddress(nameserver).version != ip_family:
raise wsme.exc.ClientSideError(_(
"IP version mismatch: was expecting "
"IPv%d, IPv%d received") % (ip_family,
IPAddress(nameserver).version))
except (AddrFormatError, ValueError):
if nameservers == 'NC':
if nameserver == 'NC':
idns_nameservers_list.append(str(""))
break
raise wsme.exc.ClientSideError(_(
"Invalid DNS nameserver target address %s "
"Please configure a valid DNS "
"address.") % (nameservers))
"address.") % (nameserver))
if len(idns_nameservers_list) == 0 or idns_nameservers_list == [""]:
if ntp_list:
@ -336,8 +341,16 @@ class DNSController(rest.RestController):
except utils.JSONPATCH_EXCEPTIONS as e:
raise exception.PatchError(patch=patch, reason=e)
LOG.warn("dns %s" % dns.as_dict())
dns = _check_dns_data(dns.as_dict())
# Since dns requests on the controller go over the oam network,
# check the ip version of the oam address pool in the database
oam_network = pecan.request.dbapi.network_get_by_type(
constants.NETWORK_TYPE_OAM)
oam_address_pool = pecan.request.dbapi.address_pool_get(
oam_network.pool_uuid)
ip_family = oam_address_pool.family
LOG.info("dns %s; ip_family: ipv%d" % (dns.as_dict(), ip_family))
dns = _check_dns_data(dns.as_dict(), ip_family)
try:
# Update only the fields that have changed

View File

@ -9,7 +9,6 @@ Tests for the API / dns / methods.
"""
import mock
import unittest
from six.moves import http_client
from sysinv.tests.api import base
from sysinv.tests.db import base as dbbase
@ -123,7 +122,6 @@ class ApiDNSPatchTestSuiteMixin(ApiDNSTestCaseMixin):
def setUp(self):
super(ApiDNSPatchTestSuiteMixin, self).setUp()
self.patch_object = self._create_db_object()
if(self.is_ipv4):
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
@ -135,6 +133,7 @@ class ApiDNSPatchTestSuiteMixin(ApiDNSTestCaseMixin):
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
self.patch_object = self._create_db_object()
def exception_dns(self):
print('Raised a fake exception')
@ -282,12 +281,32 @@ class ApiDNSListTestSuiteMixin(ApiDNSTestCaseMixin):
self.assertEqual(response[self.RESULT_KEY][0]['uuid'], self.dns_uuid)
# ============= IPv4 environment tests ==============
# Tests DNS Api operations for a Controller (defaults to IPv4)
class PlatformIPv4ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
def test_patch_ip_version_mismatch(self):
self.is_ipv4 = True
self.patch_object = self._create_db_object()
self.patch_value_no_change = '2001:4860:4860::8888,2001:4860:4860::8844'
self.patch_value_changed = '2001:4860:4860::8888'
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
expected_msg = "IP version mismatch: was expecting IPv4, IPv6 received"
self.assertIn(expected_msg, response.json['error_message'])
class PlatformIPv4ControllerApiDNSListTestCase(ApiDNSListTestSuiteMixin,
@ -314,7 +333,28 @@ class PlatformIPv6ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
dbbase.BaseIPv6Mixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
def test_patch_ip_version_mismatch(self):
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
self.patch_value_more_than_permitted = '8.8.8.8,8.8.4.4,9.9.9.9,9.8.8.9'
self.patch_value_hostname = "dns.google"
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
expected_msg = "IP version mismatch: was expecting IPv6, IPv4 received"
self.assertIn(expected_msg, response.json['error_message'])
class PlatformIPv6ControllerApiDNSListTestCase(ApiDNSListTestSuiteMixin,
@ -336,67 +376,3 @@ class PlatformIPv6ControllerApiDNSDeleteTestCase(ApiDNSDeleteTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
pass
# ============= IPv6 DNS in IPv4 environment tests ==============
class PlatformIPv6inIPv4OAMControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
def setUp(self):
super(PlatformIPv6inIPv4OAMControllerApiDNSPatchTestCase, self).setUp()
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '2001:4860:4860::8888,2001:4860:4860::8844'
self.patch_value_changed = '2001:4860:4860::8888'
self.patch_value_more_than_permitted = '2001:4860:4860::8888,2001:4860:4860::8844,'\
'2001:4860:4860::4444,2001:4860:4860::8888'
self.patch_value_hostname = "dns.google"
# See https://bugs.launchpad.net/starlingx/+bug/1860489
@unittest.expectedFailure
def test_patch_valid_change(self):
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
pass
# ============= IPv4 DNS in IPv6 environment tests ==============
class PlatformIPv4inIPv6ControllerApiDNSPatchTestCase(ApiDNSPatchTestSuiteMixin,
dbbase.BaseIPv6Mixin,
base.FunctionalTest,
dbbase.ControllerHostTestCase):
def setUp(self):
super(PlatformIPv4inIPv6ControllerApiDNSPatchTestCase, self).setUp()
self.is_ipv4 = False
self.patch_object = self._create_db_object()
self.patch_value_no_change = '8.8.8.8,8.8.4.4'
self.patch_value_changed = '8.8.8.8'
self.patch_value_more_than_permitted = '8.8.8.8,8.8.4.4,9.9.9.9,9.8.8.9'
self.patch_value_hostname = "dns.google"
# See https://bugs.launchpad.net/starlingx/+bug/1860489
@unittest.expectedFailure
def test_patch_valid_change(self):
# Update value of patchable field
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': self.patch_path_nameserver,
'value': self.patch_value_changed,
'op': 'replace'},
{"path": self.patch_path_action,
"value": "apply",
"op": "replace"}],
headers=self.API_HEADERS)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
pass