Fix allowed address pair validation
Neutron requires the allowed address pair ip address to be
either an ip or a cidr.
https://review.opendev.org/#/c/575265/ made heat verify for
cidr only.
Change-Id: I2cc2785cb32cf8d788af6262992b1b76107c8292
Story: 2005674
Task: 30985
(cherry picked from commit 5e93b3e4cf
)
This commit is contained in:
parent
a4fc781e7b
commit
cec8b079c0
|
@ -107,14 +107,18 @@ class CIDRConstraint(constraints.BaseCustomConstraint):
|
||||||
|
|
||||||
def validate(self, value, context, template=None):
|
def validate(self, value, context, template=None):
|
||||||
try:
|
try:
|
||||||
netaddr.IPNetwork(value, implicit_prefix=True)
|
if '/' in value:
|
||||||
msg = validators.validate_subnet(value)
|
msg = validators.validate_subnet(value)
|
||||||
|
else:
|
||||||
|
msg = validators.validate_ip_address(value)
|
||||||
if msg is not None:
|
if msg is not None:
|
||||||
self._error_message = msg
|
self._error_message = msg
|
||||||
return False
|
return False
|
||||||
return True
|
else:
|
||||||
except Exception as ex:
|
return True
|
||||||
self._error_message = 'Invalid net cidr %s ' % six.text_type(ex)
|
except Exception:
|
||||||
|
self._error_message = '{} is not a valid IP or CIDR'.format(
|
||||||
|
value)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -102,15 +102,22 @@ class TestCIDRConstraint(common.HeatTestCase):
|
||||||
super(TestCIDRConstraint, self).setUp()
|
super(TestCIDRConstraint, self).setUp()
|
||||||
self.constraint = cc.CIDRConstraint()
|
self.constraint = cc.CIDRConstraint()
|
||||||
|
|
||||||
def test_valid_cidr_format(self):
|
def test_valid_format(self):
|
||||||
validate_format = [
|
validate_format = [
|
||||||
'10.0.0.0/24',
|
'10.0.0.0/24',
|
||||||
|
'1.1.1.1',
|
||||||
|
'1.0.1.1',
|
||||||
|
'255.255.255.255',
|
||||||
'6000::/64',
|
'6000::/64',
|
||||||
|
'2002:2002::20c:29ff:fe7d:811a',
|
||||||
|
'::1',
|
||||||
|
'2002::',
|
||||||
|
'2002::1',
|
||||||
]
|
]
|
||||||
for cidr in validate_format:
|
for value in validate_format:
|
||||||
self.assertTrue(self.constraint.validate(cidr, None))
|
self.assertTrue(self.constraint.validate(value, None))
|
||||||
|
|
||||||
def test_invalid_cidr_format(self):
|
def test_invalid_format(self):
|
||||||
invalidate_format = [
|
invalidate_format = [
|
||||||
'::/129',
|
'::/129',
|
||||||
'Invalid cidr',
|
'Invalid cidr',
|
||||||
|
@ -120,25 +127,45 @@ class TestCIDRConstraint(common.HeatTestCase):
|
||||||
'10.0/24',
|
'10.0/24',
|
||||||
'10.0.a.10/24',
|
'10.0.a.10/24',
|
||||||
'8.8.8.0/ 24',
|
'8.8.8.0/ 24',
|
||||||
'8.8.8.8'
|
None,
|
||||||
|
123,
|
||||||
|
'1.1',
|
||||||
|
'1.1.',
|
||||||
|
'1.1.1',
|
||||||
|
'1.1.1.',
|
||||||
|
'1.1.1.256',
|
||||||
|
'1.a.1.1',
|
||||||
|
'2002::2001::1',
|
||||||
|
'2002::g',
|
||||||
|
'2001::0::',
|
||||||
|
'20c:29ff:fe7d:811a'
|
||||||
]
|
]
|
||||||
for cidr in invalidate_format:
|
for value in invalidate_format:
|
||||||
self.assertFalse(self.constraint.validate(cidr, None))
|
self.assertFalse(self.constraint.validate(value, None))
|
||||||
|
|
||||||
@mock.patch('neutron_lib.api.validators.validate_subnet')
|
@mock.patch('neutron_lib.api.validators.validate_subnet')
|
||||||
def test_validate(self, mock_validate_subnet):
|
@mock.patch('neutron_lib.api.validators.validate_ip_address')
|
||||||
|
def test_validate(self, mock_validate_ip, mock_validate_subnet):
|
||||||
test_formats = [
|
test_formats = [
|
||||||
'10.0.0/24',
|
'10.0.0/24',
|
||||||
'10.0/24',
|
'10.0/24',
|
||||||
]
|
'10.0.0.0/33',
|
||||||
self.assertFalse(self.constraint.validate('10.0.0.0/33', None))
|
|
||||||
|
|
||||||
|
]
|
||||||
for cidr in test_formats:
|
for cidr in test_formats:
|
||||||
self.assertFalse(self.constraint.validate(cidr, None))
|
self.assertFalse(self.constraint.validate(cidr, None))
|
||||||
|
mock_validate_subnet.assert_called_with(cidr)
|
||||||
|
self.assertEqual(mock_validate_subnet.call_count, 3)
|
||||||
|
|
||||||
mock_validate_subnet.assert_any_call('10.0.0/24')
|
test_formats = [
|
||||||
mock_validate_subnet.assert_called_with('10.0/24')
|
'10.0.0',
|
||||||
self.assertEqual(mock_validate_subnet.call_count, 2)
|
'10.0',
|
||||||
|
'10.0.0.0',
|
||||||
|
]
|
||||||
|
for ip in test_formats:
|
||||||
|
self.assertFalse(self.constraint.validate(ip, None))
|
||||||
|
mock_validate_ip.assert_called_with(ip)
|
||||||
|
self.assertEqual(mock_validate_ip.call_count, 3)
|
||||||
|
|
||||||
|
|
||||||
class TestISO8601Constraint(common.HeatTestCase):
|
class TestISO8601Constraint(common.HeatTestCase):
|
||||||
|
|
|
@ -183,6 +183,8 @@ class NeutronPortTest(common.HeatTestCase):
|
||||||
|
|
||||||
def test_allowed_address_pair(self):
|
def test_allowed_address_pair(self):
|
||||||
t = template_format.parse(neutron_port_with_address_pair_template)
|
t = template_format.parse(neutron_port_with_address_pair_template)
|
||||||
|
t['resources']['port']['properties'][
|
||||||
|
'allowed_address_pairs'][0]['ip_address'] = '10.0.3.21'
|
||||||
stack = utils.parse_stack(t)
|
stack = utils.parse_stack(t)
|
||||||
|
|
||||||
self.find_mock.return_value = 'abcd1234'
|
self.find_mock.return_value = 'abcd1234'
|
||||||
|
@ -200,7 +202,7 @@ class NeutronPortTest(common.HeatTestCase):
|
||||||
self.create_mock.assert_called_once_with({'port': {
|
self.create_mock.assert_called_once_with({'port': {
|
||||||
'network_id': u'abcd1234',
|
'network_id': u'abcd1234',
|
||||||
'allowed_address_pairs': [{
|
'allowed_address_pairs': [{
|
||||||
'ip_address': u'10.0.3.21/8',
|
'ip_address': u'10.0.3.21',
|
||||||
'mac_address': u'00-B0-D0-86-BB-F7'
|
'mac_address': u'00-B0-D0-86-BB-F7'
|
||||||
}],
|
}],
|
||||||
'name': utils.PhysName(stack.name, 'port'),
|
'name': utils.PhysName(stack.name, 'port'),
|
||||||
|
|
|
@ -551,8 +551,8 @@ class NeutronSubnetTest(common.HeatTestCase):
|
||||||
rsrc.validate)
|
rsrc.validate)
|
||||||
msg = ("Property error: "
|
msg = ("Property error: "
|
||||||
"resources.sub_net.properties.host_routes[0].destination: "
|
"resources.sub_net.properties.host_routes[0].destination: "
|
||||||
"Error validating value 'invalid_cidr': Invalid net cidr "
|
"Error validating value 'invalid_cidr': "
|
||||||
"invalid IPNetwork invalid_cidr ")
|
"'invalid_cidr' is not a valid IP address")
|
||||||
self.assertEqual(msg, six.text_type(ex))
|
self.assertEqual(msg, six.text_type(ex))
|
||||||
|
|
||||||
def test_ipv6_validate_ra_mode(self):
|
def test_ipv6_validate_ra_mode(self):
|
||||||
|
|
Loading…
Reference in New Issue