network: make nova to handle port_security_enabled=False

In somes cases we need to have network without any security rules
applied, unfortunatly nova does not provide way to remove l3
assignements and always at least expose the default security group.
This commit updates code to clear security groups applied to the
network when option port_security_enabled=False is activated on the
network.

Change-Id: I630008a9733624a9d9b59b7aa3b8b2a3f8985d61
Closes-Bug: #1460630
Related-Bug: #1175464
This commit is contained in:
Sahid Orentino Ferdjaoui 2016-02-24 06:55:30 -05:00
parent 6ae2b4534a
commit ee7a019826
3 changed files with 284 additions and 14 deletions

View File

@ -69,6 +69,8 @@ soft_external_network_attach_authorize = extensions.soft_core_authorizer(
_SESSION = None
_ADMIN_AUTH = None
DEFAULT_SECGROUP = 'default'
def list_opts():
opts = copy.deepcopy(_neutron_options)
@ -442,6 +444,17 @@ class API(base_api.NetworkAPI):
return ports, net_ids, ordered_networks, available_macs
def _clean_security_groups(self, security_groups):
"""Cleans security groups requested from Nova API
Neutron already passes a 'default' security group when
creating ports so it's not necessary to specify it to the
request.
"""
if DEFAULT_SECGROUP in security_groups:
security_groups.remove(DEFAULT_SECGROUP)
return security_groups
def _process_security_groups(self, instance, neutron, security_groups):
"""Processes and validates requested security groups for allocation.
@ -589,7 +602,8 @@ class API(base_api.NetworkAPI):
# available net which is permitted bug/1364344
self._check_external_network_attach(context, nets)
security_groups = kwargs.get('security_groups', [])
security_groups = self._clean_security_groups(
kwargs.get('security_groups', []))
security_group_ids = self._process_security_groups(
instance, neutron, security_groups)
@ -610,17 +624,20 @@ class API(base_api.NetworkAPI):
continue
nets_in_requested_order.append(network)
# If security groups are requested on an instance then the
# network must has a subnet associated with it. Some plugins
# implement the port-security extension which requires
# 'port_security_enabled' to be True for security groups.
# That is why True is returned if 'port_security_enabled'
# is not found.
if (security_groups and not (
network['subnets']
and network.get('port_security_enabled', True))):
raise exception.SecurityGroupCannotBeApplied()
port_security_enabled = network.get('port_security_enabled', True)
if port_security_enabled:
if not network.get('subnets'):
# Neutron can't apply security groups to a port
# for a network without L3 assignements.
raise exception.SecurityGroupCannotBeApplied()
else:
if security_group_ids:
# We don't want to apply security groups on port
# for a network defined with
# 'port_security_enabled=False'.
raise exception.SecurityGroupCannotBeApplied()
zone = 'compute:%s' % instance.availability_zone
port_req_body = {'port': {'device_id': instance.uuid,
'device_owner': zone}}

View File

@ -216,6 +216,7 @@ class TestNeutronv2Base(test.TestCase):
'tenant_id': 'my_tenantid'})
self.nets3 = self.nets2 + [{'id': 'my_netid3',
'name': 'my_netname3',
'subnets': ['mysubnid3'],
'tenant_id': 'my_tenantid'}]
self.nets4 = [{'id': 'his_netid4',
'name': 'his_netname4',
@ -223,6 +224,7 @@ class TestNeutronv2Base(test.TestCase):
# A network request with external networks
self.nets5 = self.nets1 + [{'id': 'the-external-one',
'name': 'out-of-this-world',
'subnets': ['mysubnid5'],
'router:external': True,
'tenant_id': 'should-be-an-admin'}]
# A network request with a duplicate
@ -241,7 +243,8 @@ class TestNeutronv2Base(test.TestCase):
self.nets9 = []
# A network that is both shared and external
self.nets10 = [{'id': 'net_id', 'name': 'net_name',
'router:external': True, 'shared': True}]
'router:external': True, 'shared': True,
'subnets': ['mysubnid10']}]
# A network with non-blank dns_domain to test _update_port_dns_name
self.nets11 = [{'id': 'my_netid1',
'name': 'my_netname1',
@ -1098,6 +1101,16 @@ class TestNeutronv2(TestNeutronv2Base):
self._allocate_for_instance(net_idx=3,
requested_networks=requested_networks)
def test_allocate_for_instance_with_no_subnet_defined(self):
# net_id=4 does not specify subnet and does not set the option
# port_security_disabled to True, so Neutron will not been
# able to associate the default security group to the port
# requested to be created. We expect an exception to be
# raised.
self.assertRaises(exception.SecurityGroupCannotBeApplied,
self._allocate_for_instance, net_idx=4,
_break='post_list_networks')
def test_allocate_for_instance_with_invalid_network_id(self):
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest(network_id='invalid_id')])
@ -3605,7 +3618,8 @@ class TestNeutronv2WithMock(test.TestCase):
objects = [objects.NetworkRequest(port_id='fake-port1'),
objects.NetworkRequest(port_id='fake-port2'),
objects.NetworkRequest(port_id='fail-port')])
mock_avail_nets.return_value = [{'id': 'net-1'}]
mock_avail_nets.return_value = [{'id': 'net-1',
'subnets': ['subnet1']}]
self.api.allocate_for_instance(mock.sentinel.ctx,
mock_inst,
@ -3722,7 +3736,8 @@ class TestNeutronv2WithMock(test.TestCase):
mock_inst = mock.Mock(project_id="proj-1",
availability_zone='zone-1',
uuid='inst-1')
mock_avail_nets.return_value = [{'id': 'net-1'}]
mock_avail_nets.return_value = [{'id': 'net-1',
'subnets': ['subnet1']}]
mock_nc.create_port.return_value = {'port': {'id': 'fake_id',
'tenant_id': mock_inst.project_id,
'binding:vif_type': 'binding_failed'}}
@ -4272,3 +4287,233 @@ class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
def test_get_client_for_admin_context_with_id(self):
self._test_get_client_for_admin(use_id=True, admin_context=True)
class TestNeutronPortSecurity(test.NoDBTestCase):
@mock.patch.object(neutronapi.API, 'get_instance_nw_info')
@mock.patch.object(neutronapi.API, '_update_port_dns_name')
@mock.patch.object(neutronapi.API, '_create_port')
@mock.patch.object(neutronapi.API, '_populate_neutron_extension_values')
@mock.patch.object(neutronapi.API, '_check_external_network_attach')
@mock.patch.object(neutronapi.API, '_process_security_groups')
@mock.patch.object(neutronapi.API, '_get_available_networks')
@mock.patch.object(neutronapi.API, '_process_requested_networks')
@mock.patch.object(neutronapi.API, '_has_port_binding_extension')
@mock.patch.object(neutronapi, 'get_client')
def test_no_security_groups_requested(
self, mock_get_client, mock_has_port_binding_extension,
mock_process_requested_networks, mock_get_available_networks,
mock_process_security_groups, mock_check_external_network_attach,
mock_populate_neutron_extension_values, mock_create_port,
mock_update_port_dns_name, mock_get_instance_nw_info):
nets = [
{'id': 'net1',
'name': 'net_name1',
'subnets': ['mysubnid1'],
'port_security_enabled': True},
{'id': 'net2',
'name': 'net_name2',
'subnets': ['mysubnid2'],
'port_security_enabled': True}]
onets = objects.NetworkRequestList(objects=[
objects.NetworkRequest(network_id='net1'),
objects.NetworkRequest(network_id='net2')])
instance = objects.Instance(
project_id=1, availability_zone='nova', uuid='uuid1')
secgroups = ['default'] # Nova API provides the 'default'
mock_process_requested_networks.return_value = [
None, ['net1', 'net2'], onets, None]
mock_get_available_networks.return_value = nets
mock_process_security_groups.return_value = []
api = neutronapi.API()
api.allocate_for_instance(
'context', instance, requested_networks=onets,
security_groups=secgroups)
mock_process_security_groups.assert_called_once_with(
instance, mock.ANY, [])
mock_create_port.assert_has_calls([
mock.call(
mock.ANY, instance,
u'net1', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, [], None, None),
mock.call(
mock.ANY, instance,
u'net2', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, [], None, None)])
@mock.patch.object(neutronapi.API, 'get_instance_nw_info')
@mock.patch.object(neutronapi.API, '_update_port_dns_name')
@mock.patch.object(neutronapi.API, '_create_port')
@mock.patch.object(neutronapi.API, '_populate_neutron_extension_values')
@mock.patch.object(neutronapi.API, '_check_external_network_attach')
@mock.patch.object(neutronapi.API, '_process_security_groups')
@mock.patch.object(neutronapi.API, '_get_available_networks')
@mock.patch.object(neutronapi.API, '_process_requested_networks')
@mock.patch.object(neutronapi.API, '_has_port_binding_extension')
@mock.patch.object(neutronapi, 'get_client')
def test_security_groups_requested(
self, mock_get_client, mock_has_port_binding_extension,
mock_process_requested_networks, mock_get_available_networks,
mock_process_security_groups, mock_check_external_network_attach,
mock_populate_neutron_extension_values, mock_create_port,
mock_update_port_dns_name, mock_get_instance_nw_info):
nets = [
{'id': 'net1',
'name': 'net_name1',
'subnets': ['mysubnid1'],
'port_security_enabled': True},
{'id': 'net2',
'name': 'net_name2',
'subnets': ['mysubnid2'],
'port_security_enabled': True}]
onets = objects.NetworkRequestList(objects=[
objects.NetworkRequest(network_id='net1'),
objects.NetworkRequest(network_id='net2')])
instance = objects.Instance(
project_id=1, availability_zone='nova', uuid='uuid1')
secgroups = ['default', 'secgrp1', 'secgrp2']
mock_process_requested_networks.return_value = [
None, ['net1', 'net2'], onets, None]
mock_get_available_networks.return_value = nets
mock_process_security_groups.return_value = ['secgrp-uuid1',
'secgrp-uuid2']
api = neutronapi.API()
api.allocate_for_instance(
'context', instance, requested_networks=onets,
security_groups=secgroups)
mock_process_security_groups.assert_called_once_with(
instance, mock.ANY, ['secgrp1', 'secgrp2'])
mock_create_port.assert_has_calls([
mock.call(
mock.ANY, instance,
u'net1', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, ['secgrp-uuid1', 'secgrp-uuid2'], None, None),
mock.call(
mock.ANY, instance,
u'net2', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, ['secgrp-uuid1', 'secgrp-uuid2'], None, None)])
@mock.patch.object(neutronapi.API, 'get_instance_nw_info')
@mock.patch.object(neutronapi.API, '_update_port_dns_name')
@mock.patch.object(neutronapi.API, '_create_port')
@mock.patch.object(neutronapi.API, '_populate_neutron_extension_values')
@mock.patch.object(neutronapi.API, '_check_external_network_attach')
@mock.patch.object(neutronapi.API, '_process_security_groups')
@mock.patch.object(neutronapi.API, '_get_available_networks')
@mock.patch.object(neutronapi.API, '_process_requested_networks')
@mock.patch.object(neutronapi.API, '_has_port_binding_extension')
@mock.patch.object(neutronapi, 'get_client')
def test_port_security_disabled_no_security_groups_requested(
self, mock_get_client, mock_has_port_binding_extension,
mock_process_requested_networks, mock_get_available_networks,
mock_process_security_groups, mock_check_external_network_attach,
mock_populate_neutron_extension_values, mock_create_port,
mock_update_port_dns_name, mock_get_instance_nw_info):
nets = [
{'id': 'net1',
'name': 'net_name1',
'subnets': ['mysubnid1'],
'port_security_enabled': False},
{'id': 'net2',
'name': 'net_name2',
'subnets': ['mysubnid2'],
'port_security_enabled': False}]
onets = objects.NetworkRequestList(objects=[
objects.NetworkRequest(network_id='net1'),
objects.NetworkRequest(network_id='net2')])
instance = objects.Instance(
project_id=1, availability_zone='nova', uuid='uuid1')
secgroups = ['default'] # Nova API provides the 'default'
mock_process_requested_networks.return_value = [
None, ['net1', 'net2'], onets, None]
mock_get_available_networks.return_value = nets
mock_process_security_groups.return_value = []
api = neutronapi.API()
api.allocate_for_instance(
'context', instance, requested_networks=onets,
security_groups=secgroups)
mock_process_security_groups.assert_called_once_with(
instance, mock.ANY, [])
mock_create_port.assert_has_calls([
mock.call(
mock.ANY, instance,
u'net1', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, [], None, None),
mock.call(
mock.ANY, instance,
u'net2', {'port':
{'device_owner': u'compute:nova',
'device_id': 'uuid1'}},
None, [], None, None)])
@mock.patch.object(neutronapi.API, 'get_instance_nw_info')
@mock.patch.object(neutronapi.API, '_update_port_dns_name')
@mock.patch.object(neutronapi.API, '_create_port')
@mock.patch.object(neutronapi.API, '_populate_neutron_extension_values')
@mock.patch.object(neutronapi.API, '_check_external_network_attach')
@mock.patch.object(neutronapi.API, '_process_security_groups')
@mock.patch.object(neutronapi.API, '_get_available_networks')
@mock.patch.object(neutronapi.API, '_process_requested_networks')
@mock.patch.object(neutronapi.API, '_has_port_binding_extension')
@mock.patch.object(neutronapi, 'get_client')
def test_port_security_disabled_and_security_groups_requested(
self, mock_get_client, mock_has_port_binding_extension,
mock_process_requested_networks, mock_get_available_networks,
mock_process_security_groups, mock_check_external_network_attach,
mock_populate_neutron_extension_values, mock_create_port,
mock_update_port_dns_name, mock_get_instance_nw_info):
nets = [
{'id': 'net1',
'name': 'net_name1',
'subnets': ['mysubnid1'],
'port_security_enabled': True},
{'id': 'net2',
'name': 'net_name2',
'subnets': ['mysubnid2'],
'port_security_enabled': False}]
onets = objects.NetworkRequestList(objects=[
objects.NetworkRequest(network_id='net1'),
objects.NetworkRequest(network_id='net2')])
instance = objects.Instance(
project_id=1, availability_zone='nova', uuid='uuid1')
secgroups = ['default', 'secgrp1', 'secgrp2']
mock_process_requested_networks.return_value = [
None, ['net1', 'net2'], onets, None]
mock_get_available_networks.return_value = nets
mock_process_security_groups.return_value = ['secgrp-uuid1',
'secgrp-uuid2']
api = neutronapi.API()
self.assertRaises(
exception.SecurityGroupCannotBeApplied,
api.allocate_for_instance,
'context', instance, requested_networks=onets,
security_groups=secgroups)
mock_process_security_groups.assert_called_once_with(
instance, mock.ANY, ['secgrp1', 'secgrp2'])

View File

@ -0,0 +1,8 @@
---
issues:
- When using Neutron extension 'port_security' and booting an
instance on a network with 'port_security_enabled=False' the Nova
API response says there is a 'default' security group attached to
the instance which is incorrect. However when listing security groups
for the instance there are none listed, which is correct. The API
response will be fixed separately with a microversion.