Adds delete of a extra_dhcp_opt on a port

Add support for delete of extra_dhcp_opt(s) on a port. Where
[{'opt_name': 'opt_to_delete', 'opt_value': None}].

Closes-Bug: #1228008

Change-Id: I75baeff91575cac6546fe2cc6fcf7a0d8e92853f
This commit is contained in:
dekehn 2013-09-10 10:13:29 -06:00
parent 0e2198ef75
commit 77aee9e715
4 changed files with 97 additions and 40 deletions

View File

@ -74,6 +74,11 @@ def _validate_values(data, valid_values=None):
return msg
def _validate_string_or_none(data, max_len=None):
if data is not None:
return _validate_string(data, max_len=None)
def _validate_string(data, max_len=None):
if not isinstance(data, basestring):
msg = _("'%s' is not a valid string") % data
@ -521,6 +526,7 @@ validators = {'type:dict': _validate_dict,
'type:range': _validate_range,
'type:regex': _validate_regex,
'type:string': _validate_string,
'type:string_or_none': _validate_string_or_none,
'type:subnet': _validate_subnet,
'type:subnet_list': _validate_subnet_list,
'type:uuid': _validate_uuid,

View File

@ -62,11 +62,12 @@ class ExtraDhcpOptMixin(object):
return port
with context.session.begin(subtransactions=True):
for dopt in extra_dhcp_opts:
db = ExtraDhcpOpt(
port_id=port['id'],
opt_name=dopt['opt_name'],
opt_value=dopt['opt_value'])
context.session.add(db)
if dopt['opt_value']:
db = ExtraDhcpOpt(
port_id=port['id'],
opt_name=dopt['opt_name'],
opt_value=dopt['opt_value'])
context.session.add(db)
return self._extend_port_extra_dhcp_opts_dict(context, port)
def _extend_port_extra_dhcp_opts_dict(self, context, port):
@ -90,25 +91,19 @@ class ExtraDhcpOptMixin(object):
context, ExtraDhcpOpt).filter_by(port_id=id).all()
# if there are currently no dhcp_options associated to
# this port, Then just insert the new ones and be done.
if not opt_db:
with context.session.begin(subtransactions=True):
for dopt in dopts:
db = ExtraDhcpOpt(
port_id=id,
opt_name=dopt['opt_name'],
opt_value=dopt['opt_value'])
context.session.add(db)
else:
with context.session.begin(subtransactions=True):
for upd_rec in dopts:
with context.session.begin(subtransactions=True):
for opt in opt_db:
if opt['opt_name'] == upd_rec['opt_name']:
if opt['opt_value'] != upd_rec['opt_value']:
opt.update(
{'opt_value': upd_rec['opt_value']})
break
# this handles the adding an option that didn't exist.
else:
for opt in opt_db:
if opt['opt_name'] == upd_rec['opt_name']:
# to handle deleting of a opt from the port.
if upd_rec['opt_value'] is None:
context.session.delete(opt)
elif opt['opt_value'] != upd_rec['opt_value']:
opt.update(
{'opt_value': upd_rec['opt_value']})
break
else:
if upd_rec['opt_value'] is not None:
db = ExtraDhcpOpt(
port_id=id,
opt_name=upd_rec['opt_name'],

View File

@ -55,7 +55,8 @@ EXTENDED_ATTRIBUTES_2_0 = {
'type:list_of_dict_or_none': {
'id': {'type:uuid': None, 'required': False},
'opt_name': {'type:string': None, 'required': True},
'opt_value': {'type:string': None, 'required': True}}}}}}
'opt_value': {'type:string_or_none': None,
'required': True}}}}}}
class Extra_dhcp_opt(extensions.ExtensionDescriptor):

View File

@ -72,34 +72,53 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
self.assertEqual(opt['opt_value'], val)
def test_create_port_with_extradhcpopts(self):
opt_dict = [{'opt_name': 'bootfile-name',
opt_list = [{'opt_name': 'bootfile-name',
'opt_value': 'pxelinux.0'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'}]
params = {edo_ext.EXTRADHCPOPTS: opt_dict,
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
self._check_opts(opt_dict,
self._check_opts(opt_list,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_create_port_with_none_extradhcpopts(self):
new_list = [{'opt_name': 'bootfile-name',
'opt_value': None},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'}]
new_dict = [{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'}]
params = {edo_ext.EXTRADHCPOPTS: new_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
self._check_opts(new_dict,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopts_with_same(self):
opt_dict = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = opt_dict[:]
new_opts = opt_list[:]
for i in new_opts:
if i['opt_name'] == upd_opts[0]['opt_name']:
i['opt_value'] = upd_opts[0]['opt_value']
break
params = {edo_ext.EXTRADHCPOPTS: opt_dict,
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
@ -112,19 +131,19 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopts(self):
opt_dict = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = copy.deepcopy(opt_dict)
new_opts = copy.deepcopy(opt_list)
for i in new_opts:
if i['opt_name'] == upd_opts[0]['opt_name']:
i['opt_value'] = upd_opts[0]['opt_value']
break
params = {edo_ext.EXTRADHCPOPTS: opt_dict,
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
@ -136,16 +155,16 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
self._check_opts(new_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopt1(self):
opt_dict = [{'opt_name': 'tftp-server',
def test_update_port_with_additional_extradhcpopt(self):
opt_list = [{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': 'changeme.0'}]
new_opts = copy.deepcopy(opt_dict)
new_opts = copy.deepcopy(opt_list)
new_opts.append(upd_opts[0])
params = {edo_ext.EXTRADHCPOPTS: opt_dict,
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
@ -157,17 +176,53 @@ class TestExtraDhcpOpt(ExtraDhcpOptDBTestCase):
self._check_opts(new_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_with_extradhcpopt_delete(self):
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}]
new_opts = []
new_opts = [opt for opt in opt_list
if opt['opt_name'] != 'bootfile-name']
params = {edo_ext.EXTRADHCPOPTS: opt_list,
'arg_list': (edo_ext.EXTRADHCPOPTS,)}
with self.port(**params) as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
self._check_opts(new_opts,
port['port'][edo_ext.EXTRADHCPOPTS])
def test_update_port_without_extradhcpopt_delete(self):
upd_opts = [{'opt_name': 'bootfile-name', 'opt_value': None}]
with self.port() as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: upd_opts}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
edo_attr = port['port'].get(edo_ext.EXTRADHCPOPTS)
self.assertEqual(edo_attr, [])
def test_update_port_adding_extradhcpopts(self):
opt_dict = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
opt_list = [{'opt_name': 'bootfile-name', 'opt_value': 'pxelinux.0'},
{'opt_name': 'tftp-server',
'opt_value': '123.123.123.123'},
{'opt_name': 'server-ip-address',
'opt_value': '123.123.123.456'}]
with self.port() as port:
update_port = {'port': {edo_ext.EXTRADHCPOPTS: opt_dict}}
update_port = {'port': {edo_ext.EXTRADHCPOPTS: opt_list}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize('json', req.get_response(self.api))
self._check_opts(opt_dict,
self._check_opts(opt_list,
port['port'][edo_ext.EXTRADHCPOPTS])