extra_dhcp_opt add checks for empty strings
When passing extra-dhcp-opt into the port-create where an empty string is provided as opt_value=' ', the create and update routine will load the empty string into the DB. The result when written to the opts file is: "tag:ece4c8aa-15c9-4f6b-8c42-7d4e285734bf,option:server-ip-address", which when read by dnsmasq has been tested to cause dnsmasq to segment fault. Change-Id: I31de4a3d27092bb219d20221c5ef5a6b22e050dc Closes-Bug: #1257467
This commit is contained in:
parent
3b42338735
commit
794a1325df
@ -74,9 +74,22 @@ def _validate_values(data, valid_values=None):
|
|||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_not_empty_string_or_none(data, max_len=None):
|
||||||
|
if data is not None:
|
||||||
|
return _validate_not_empty_string(data, max_len=max_len)
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_not_empty_string(data, max_len=None):
|
||||||
|
msg = _validate_string(data, max_len=max_len)
|
||||||
|
if msg:
|
||||||
|
return msg
|
||||||
|
if not data.strip():
|
||||||
|
return _("'%s' Blank strings are not permitted") % data
|
||||||
|
|
||||||
|
|
||||||
def _validate_string_or_none(data, max_len=None):
|
def _validate_string_or_none(data, max_len=None):
|
||||||
if data is not None:
|
if data is not None:
|
||||||
return _validate_string(data, max_len=None)
|
return _validate_string(data, max_len=max_len)
|
||||||
|
|
||||||
|
|
||||||
def _validate_string(data, max_len=None):
|
def _validate_string(data, max_len=None):
|
||||||
@ -527,6 +540,9 @@ validators = {'type:dict': _validate_dict,
|
|||||||
'type:regex': _validate_regex,
|
'type:regex': _validate_regex,
|
||||||
'type:string': _validate_string,
|
'type:string': _validate_string,
|
||||||
'type:string_or_none': _validate_string_or_none,
|
'type:string_or_none': _validate_string_or_none,
|
||||||
|
'type:not_empty_string': _validate_not_empty_string,
|
||||||
|
'type:not_empty_string_or_none':
|
||||||
|
_validate_not_empty_string_or_none,
|
||||||
'type:subnet': _validate_subnet,
|
'type:subnet': _validate_subnet,
|
||||||
'type:subnet_list': _validate_subnet_list,
|
'type:subnet_list': _validate_subnet_list,
|
||||||
'type:uuid': _validate_uuid,
|
'type:uuid': _validate_uuid,
|
||||||
|
@ -26,8 +26,7 @@ class ExtraDhcpOptNotFound(exceptions.NotFound):
|
|||||||
|
|
||||||
|
|
||||||
class ExtraDhcpOptBadData(exceptions.InvalidInput):
|
class ExtraDhcpOptBadData(exceptions.InvalidInput):
|
||||||
message = _("Invalid data format for extra-dhcp-opt, "
|
message = _("Invalid data format for extra-dhcp-opt: %(data)s")
|
||||||
"provide a list of dicts: %(data)s")
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_list_of_dict_or_none(data, key_specs=None):
|
def _validate_list_of_dict_or_none(data, key_specs=None):
|
||||||
@ -39,6 +38,7 @@ def _validate_list_of_dict_or_none(data, key_specs=None):
|
|||||||
if msg:
|
if msg:
|
||||||
raise ExtraDhcpOptBadData(data=msg)
|
raise ExtraDhcpOptBadData(data=msg)
|
||||||
|
|
||||||
|
|
||||||
attr.validators['type:list_of_dict_or_none'] = _validate_list_of_dict_or_none
|
attr.validators['type:list_of_dict_or_none'] = _validate_list_of_dict_or_none
|
||||||
|
|
||||||
# Attribute Map
|
# Attribute Map
|
||||||
@ -54,8 +54,9 @@ EXTENDED_ATTRIBUTES_2_0 = {
|
|||||||
'validate': {
|
'validate': {
|
||||||
'type:list_of_dict_or_none': {
|
'type:list_of_dict_or_none': {
|
||||||
'id': {'type:uuid': None, 'required': False},
|
'id': {'type:uuid': None, 'required': False},
|
||||||
'opt_name': {'type:string': None, 'required': True},
|
'opt_name': {'type:not_empty_string': None,
|
||||||
'opt_value': {'type:string_or_none': None,
|
'required': True},
|
||||||
|
'opt_value': {'type:not_empty_string_or_none': None,
|
||||||
'required': True}}}}}}
|
'required': True}}}}}}
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,6 +65,24 @@ class TestAttributes(base.BaseTestCase):
|
|||||||
msg = attributes._validate_values(7, (4, 6))
|
msg = attributes._validate_values(7, (4, 6))
|
||||||
self.assertEqual(msg, "'7' is not in (4, 6)")
|
self.assertEqual(msg, "'7' is not in (4, 6)")
|
||||||
|
|
||||||
|
def test_validate_not_empty_string(self):
|
||||||
|
msg = attributes._validate_not_empty_string(' ', None)
|
||||||
|
self.assertEqual(msg, u"' ' Blank strings are not permitted")
|
||||||
|
|
||||||
|
def test_validate_not_empty_string_or_none(self):
|
||||||
|
msg = attributes._validate_not_empty_string_or_none(' ', None)
|
||||||
|
self.assertEqual(msg, u"' ' Blank strings are not permitted")
|
||||||
|
|
||||||
|
msg = attributes._validate_not_empty_string_or_none(None, None)
|
||||||
|
self.assertIsNone(msg)
|
||||||
|
|
||||||
|
def test_validate_string_or_none(self):
|
||||||
|
msg = attributes._validate_not_empty_string_or_none('test', None)
|
||||||
|
self.assertIsNone(msg)
|
||||||
|
|
||||||
|
msg = attributes._validate_not_empty_string_or_none(None, None)
|
||||||
|
self.assertIsNone(msg)
|
||||||
|
|
||||||
def test_validate_string(self):
|
def test_validate_string(self):
|
||||||
msg = attributes._validate_string(None, None)
|
msg = attributes._validate_string(None, None)
|
||||||
self.assertEqual(msg, "'None' is not a valid string")
|
self.assertEqual(msg, "'None' is not a valid string")
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import webob.exc
|
||||||
|
|
||||||
from neutron.db import db_base_plugin_v2
|
from neutron.db import db_base_plugin_v2
|
||||||
from neutron.db import extradhcpopt_db as edo_db
|
from neutron.db import extradhcpopt_db as edo_db
|
||||||
@ -87,22 +88,22 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
def test_create_port_with_none_extradhcpopts(self):
|
def test_create_port_with_none_extradhcpopts(self):
|
||||||
new_list = [{'opt_name': 'bootfile-name',
|
opt_list = [{'opt_name': 'bootfile-name',
|
||||||
'opt_value': None},
|
'opt_value': None},
|
||||||
{'opt_name': 'server-ip-address',
|
{'opt_name': 'server-ip-address',
|
||||||
'opt_value': '123.123.123.456'},
|
'opt_value': '123.123.123.456'},
|
||||||
{'opt_name': 'tftp-server',
|
{'opt_name': 'tftp-server',
|
||||||
'opt_value': '123.123.123.123'}]
|
'opt_value': '123.123.123.123'}]
|
||||||
new_dict = [{'opt_name': 'server-ip-address',
|
expected = [{'opt_name': 'server-ip-address',
|
||||||
'opt_value': '123.123.123.456'},
|
'opt_value': '123.123.123.456'},
|
||||||
{'opt_name': 'tftp-server',
|
{'opt_name': 'tftp-server',
|
||||||
'opt_value': '123.123.123.123'}]
|
'opt_value': '123.123.123.123'}]
|
||||||
|
|
||||||
params = {edo_ext.EXTRADHCPOPTS: new_list,
|
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
||||||
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
||||||
|
|
||||||
with self.port(**params) as port:
|
with self.port(**params) as port:
|
||||||
self._check_opts(new_dict,
|
self._check_opts(expected,
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
def test_update_port_with_extradhcpopts_with_same(self):
|
def test_update_port_with_extradhcpopts_with_same(self):
|
||||||
@ -112,8 +113,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
{'opt_name': 'server-ip-address',
|
{'opt_name': 'server-ip-address',
|
||||||
'opt_value': '123.123.123.456'}]
|
'opt_value': '123.123.123.456'}]
|
||||||
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
||||||
new_opts = opt_list[:]
|
expected_opts = opt_list[:]
|
||||||
for i in new_opts:
|
for i in expected_opts:
|
||||||
if i['opt_name'] == upd_opts[0]['opt_name']:
|
if i['opt_name'] == upd_opts[0]['opt_name']:
|
||||||
i['opt_value'] = upd_opts[0]['opt_value']
|
i['opt_value'] = upd_opts[0]['opt_value']
|
||||||
break
|
break
|
||||||
@ -127,7 +128,27 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
req = self.new_update_request('ports', update_port,
|
req = self.new_update_request('ports', update_port,
|
||||||
port['port']['id'])
|
port['port']['id'])
|
||||||
port = self.deserialize('json', req.get_response(self.api))
|
port = self.deserialize('json', req.get_response(self.api))
|
||||||
self._check_opts(new_opts,
|
self._check_opts(expected_opts,
|
||||||
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
|
def test_update_port_with_additional_extradhcpopt(self):
|
||||||
|
opt_list = [{'opt_name': 'tftp-server',
|
||||||
|
'opt_value': '123.123.123.123'},
|
||||||
|
{'opt_name': 'server-ip-address',
|
||||||
|
'opt_value': '123.123.123.456'}]
|
||||||
|
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
||||||
|
expected_opts = copy.deepcopy(opt_list)
|
||||||
|
expected_opts.append(upd_opts[0])
|
||||||
|
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
||||||
|
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
||||||
|
|
||||||
|
with self.port(**params) as port:
|
||||||
|
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
|
||||||
|
|
||||||
|
req = self.new_update_request('ports', update_port,
|
||||||
|
port['port']['id'])
|
||||||
|
port = self.deserialize('json', req.get_response(self.api))
|
||||||
|
self._check_opts(expected_opts,
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
def test_update_port_with_extradhcpopts(self):
|
def test_update_port_with_extradhcpopts(self):
|
||||||
@ -137,8 +158,8 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
{'opt_name': 'server-ip-address',
|
{'opt_name': 'server-ip-address',
|
||||||
'opt_value': '123.123.123.456'}]
|
'opt_value': '123.123.123.456'}]
|
||||||
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
||||||
new_opts = copy.deepcopy(opt_list)
|
expected_opts = copy.deepcopy(opt_list)
|
||||||
for i in new_opts:
|
for i in expected_opts:
|
||||||
if i['opt_name'] == upd_opts[0]['opt_name']:
|
if i['opt_name'] == upd_opts[0]['opt_name']:
|
||||||
i['opt_value'] = upd_opts[0]['opt_value']
|
i['opt_value'] = upd_opts[0]['opt_value']
|
||||||
break
|
break
|
||||||
@ -152,28 +173,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
req = self.new_update_request('ports', update_port,
|
req = self.new_update_request('ports', update_port,
|
||||||
port['port']['id'])
|
port['port']['id'])
|
||||||
port = self.deserialize('json', req.get_response(self.api))
|
port = self.deserialize('json', req.get_response(self.api))
|
||||||
self._check_opts(new_opts,
|
self._check_opts(expected_opts,
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
|
||||||
|
|
||||||
def test_update_port_with_additional_extradhcpopt(self):
|
|
||||||
opt_list = [{'opt_name': 'tftp-server',
|
|
||||||
'opt_value': '123.123.123.123'},
|
|
||||||
{'opt_name': 'server-ip-address',
|
|
||||||
'opt_value': '123.123.123.456'}]
|
|
||||||
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
|
|
||||||
new_opts = copy.deepcopy(opt_list)
|
|
||||||
new_opts.append(upd_opts[0])
|
|
||||||
|
|
||||||
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
|
||||||
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
|
||||||
|
|
||||||
with self.port(**params) as port:
|
|
||||||
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
|
|
||||||
|
|
||||||
req = self.new_update_request('ports', update_port,
|
|
||||||
port['port']['id'])
|
|
||||||
port = self.deserialize('json', req.get_response(self.api))
|
|
||||||
self._check_opts(new_opts,
|
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
def test_update_port_with_extradhcpopt_delete(self):
|
def test_update_port_with_extradhcpopt_delete(self):
|
||||||
@ -183,10 +183,10 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
{'opt_name': 'server-ip-address',
|
{'opt_name': 'server-ip-address',
|
||||||
'opt_value': '123.123.123.456'}]
|
'opt_value': '123.123.123.456'}]
|
||||||
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}]
|
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}]
|
||||||
new_opts = []
|
expected_opts = []
|
||||||
|
|
||||||
new_opts = [opt for opt in opt_list
|
expected_opts = [opt for opt in opt_list
|
||||||
if opt['opt_name'] != 'bootfile-name']
|
if opt['opt_name'] != 'bootfile-name']
|
||||||
|
|
||||||
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
||||||
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
||||||
@ -197,7 +197,7 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
req = self.new_update_request('ports', update_port,
|
req = self.new_update_request('ports', update_port,
|
||||||
port['port']['id'])
|
port['port']['id'])
|
||||||
port = self.deserialize('json', req.get_response(self.api))
|
port = self.deserialize('json', req.get_response(self.api))
|
||||||
self._check_opts(new_opts,
|
self._check_opts(expected_opts,
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
def test_update_port_without_extradhcpopt_delete(self):
|
def test_update_port_without_extradhcpopt_delete(self):
|
||||||
@ -226,3 +226,41 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
|
|||||||
port = self.deserialize('json', req.get_response(self.api))
|
port = self.deserialize('json', req.get_response(self.api))
|
||||||
self._check_opts(opt_list,
|
self._check_opts(opt_list,
|
||||||
port['port'][edo_ext.EXTRADHCPOPTS])
|
port['port'][edo_ext.EXTRADHCPOPTS])
|
||||||
|
|
||||||
|
def test_update_port_with_blank_string_extradhcpopt(self):
|
||||||
|
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
|
||||||
|
{'opt_name': 'tftp-server',
|
||||||
|
'opt_value': '123.123.123.123'},
|
||||||
|
{'opt_name': 'server-ip-address',
|
||||||
|
'opt_value': '123.123.123.456'}]
|
||||||
|
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': ' '}]
|
||||||
|
|
||||||
|
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
||||||
|
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
||||||
|
|
||||||
|
with self.port(**params) as port:
|
||||||
|
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
|
||||||
|
|
||||||
|
req = self.new_update_request('ports', update_port,
|
||||||
|
port['port']['id'])
|
||||||
|
res = req.get_response(self.api)
|
||||||
|
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||||
|
|
||||||
|
def test_update_port_with_blank_name_extradhcpopt(self):
|
||||||
|
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
|
||||||
|
{'opt_name': 'tftp-server',
|
||||||
|
'opt_value': '123.123.123.123'},
|
||||||
|
{'opt_name': 'server-ip-address',
|
||||||
|
'opt_value': '123.123.123.456'}]
|
||||||
|
upd_opts = [{'opt_name': ' ', 'opt_value': 'pxelinux.0'}]
|
||||||
|
|
||||||
|
params = {edo_ext.EXTRADHCPOPTS: opt_list,
|
||||||
|
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
|
||||||
|
|
||||||
|
with self.port(**params) as port:
|
||||||
|
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
|
||||||
|
|
||||||
|
req = self.new_update_request('ports', update_port,
|
||||||
|
port['port']['id'])
|
||||||
|
res = req.get_response(self.api)
|
||||||
|
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||||
|
Loading…
Reference in New Issue
Block a user