[NSX-P] Relax network update provider attribute validation

Allow updates in physical network id for network of type l3_ext
in order to allow for re-wiring an external network to a different
NSX Tier-0 GW router.

Change-Id: I1e3dc1ed874c78d9db43a31ddf29f413e530a808
This commit is contained in:
Salvatore Orlando 2021-04-22 08:13:24 -07:00
parent 1d6fbdf15b
commit 201e39868e
2 changed files with 117 additions and 5 deletions

View File

@ -877,6 +877,30 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
if nsx_id in NET_NSX_2_NEUTRON_ID_CACHE:
del NET_NSX_2_NEUTRON_ID_CACHE[nsx_id]
def _raise_if_updates_provider_attributes(self, original_net, net_data):
# Neutron does not support changing provider network values
# except for chaging provider physical network on networks whose
# provider network_type is l3_ext
try:
utils.raise_if_updates_provider_attributes(net_data)
# No provider update
except n_exc.InvalidInput as e:
# if we end here provider attributes were specified
original_net_type = original_net.get(pnet_apidef.NETWORK_TYPE)
if original_net_type == utils.NetworkTypes.L3_EXT:
if (not validators.is_attr_set(net_data.get(
pnet_apidef.SEGMENTATION_ID)) and validators.is_attr_set(
net_data.get(pnet_apidef.PHYSICAL_NETWORK))):
# Ensure valid T0 router
self._validate_external_net_create(
net_data, None, self._tier0_validator)
LOG.info("Allowing update of l3_ext provider network "
"%s with physical network %s",
original_net['id'],
net_data.get(pnet_apidef.PHYSICAL_NETWORK))
return net_data.get(pnet_apidef.PHYSICAL_NETWORK)
raise e
def update_network(self, context, network_id, network):
original_net = super(NsxPolicyPlugin, self).get_network(
context, network_id)
@ -887,21 +911,45 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
net_data)
self._assert_on_resource_admin_state_down(net_data)
# Neutron does not support changing provider network values
utils.raise_if_updates_provider_attributes(net_data)
# Physical network for l3_ext can be changed
new_provider_phy_net = self._raise_if_updates_provider_attributes(
original_net, net_data)
extern_net = self._network_is_external(context, network_id)
is_nsx_net = self._network_is_nsx_net(context, network_id)
# Update the neutron network
updated_net = super(NsxPolicyPlugin, self).update_network(
context, network_id, network)
with db_api.CONTEXT_WRITER.using(context):
updated_net = super(NsxPolicyPlugin, self).update_network(
context, network_id, network)
if new_provider_phy_net:
# Save provider network fields, needed by get_network()
curr_bindings = nsx_db.get_network_bindings(
context.session, network_id)
if curr_bindings:
nsx_db.delete_network_bindings(
context.session, network_id)
net_type = curr_bindings[0].binding_type
vlan_id = curr_bindings[0].vlan_id
else:
# This should never happen. However safe defaults can be
# set easily
LOG.warning("Bindings for provider network %s not found. "
"Setting defaults", network_id)
net_type = utils.NetworkTypes.L3_EXT
vlan_id = 0
net_bindings = [nsx_db.add_network_binding(
context.session, network_id,
net_type, new_provider_phy_net, vlan_id)]
self._extend_network_dict_provider(context, updated_net,
bindings=net_bindings)
self._extension_manager.process_update_network(context, net_data,
updated_net)
if psec.PORTSECURITY in net_data:
self._process_network_port_security_update(
context, net_data, updated_net)
self._process_l3_update(context, updated_net, network['network'])
self._extend_network_dict_provider(context, updated_net)
if qos_consts.QOS_POLICY_ID in net_data:
# attach the policy to the network in neutron DB

View File

@ -592,6 +592,70 @@ class NsxPTestNetworks(test_db_base_plugin_v2.TestNetworksV2,
network['id'], data)
self.assertEqual(False, res['port_security_enabled'])
def test_update_network_l3_ext_provider(self):
with self._create_l3_ext_network() as network:
net_data = network['network']
data = {'network': {'id': net_data['id'],
'provider:network_type': 'l3_ext',
'provider:physical_network': 'other'}}
res = self.plugin.update_network(context.get_admin_context(),
net_data['id'], data)
self.assertEqual('other', res['provider:physical_network'])
def test_update_network_l3_ext_provider_segmentation_id_fails(self):
with self._create_l3_ext_network() as network:
net_data = network['network']
data = {'network': {'id': net_data['id'],
'provider:network_type': 'l3_ext',
'provider:physical_network': 'other',
'provider:segmentation_id': 666}}
self.assertRaises(n_exc.InvalidInput,
self.plugin.update_network,
context.get_admin_context(),
net_data['id'], data)
def test_update_network_l3_ext_provider_no_original_prov_fails(self):
with self.network(name='test_no_prov') as network:
net_data = network['network']
data = {'network': {'id': net_data['id'],
'provider:network_type': 'l3_ext',
'provider:physical_network': 'other',
'provider:segmentation_id': 666}}
self.assertRaises(n_exc.InvalidInput,
self.plugin.update_network,
context.get_admin_context(),
net_data['id'], data)
def test_update_network_no_prov_does_not_update_bindings(self):
with mock.patch(
'vmware_nsx.db.db.delete_network_bindings') as mock_del_bindings,\
mock.patch(
'vmware_nsx.db.db.add_network_binding') as mock_add_bindings,\
self.network(name='test_no_prov') as network:
net_data = network['network']
data = {'network': {'id': net_data['id'],
'name': 'new_name'}}
res = self.plugin.update_network(context.get_admin_context(),
net_data['id'], data)
self.assertEqual('new_name', res['name'])
mock_del_bindings.assert_not_called()
mock_add_bindings.assert_not_called()
def test_update_network_l3_ext_provider_other_original_prov_fails(self):
providernet_args = {pnet.NETWORK_TYPE: 'geneve'}
with self.network(name='test_geneve_net',
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE, )) as network:
net_data = network['network']
data = {'network': {'id': net_data['id'],
'provider:network_type': 'l3_ext',
'provider:physical_network': 'other',
'provider:segmentation_id': 666}}
self.assertRaises(n_exc.InvalidInput,
self.plugin.update_network,
context.get_admin_context(),
net_data['id'], data)
class NsxPTestPorts(common_v3.NsxV3TestPorts,
common_v3.NsxV3SubnetMixin,