Add utility function for checking trusted port

Ports that have a device_owner that starts with 'network:'
are trusted in several places throughout the codebase. Each
of these did a startswith check on each field and it's not
immediately obvious why it's done.

This patch adds a utility function called 'is_port_trusted'
that performs the same check and makes it obvious what is
being done.

Change-Id: I542c753776d5cfb2fd736b25ea6e111867c89c89
This commit is contained in:
Kevin Benton 2015-09-03 10:01:40 -07:00 committed by Kevin Benton
parent 710ab239e4
commit 638d16c8a0
6 changed files with 27 additions and 17 deletions

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.i18n import _LW
from neutron import manager
@ -80,7 +81,7 @@ class SecurityGroupServerRpcCallback(object):
return dict(
(port['id'], port)
for port in self.plugin.get_ports_from_devices(context, devices)
if port and not port['device_owner'].startswith('network:')
if port and not utils.is_port_trusted(port)
)
def security_group_rules_for_devices(self, context, **kwargs):

View File

@ -432,6 +432,15 @@ def ip_version_from_int(ip_version_int):
raise ValueError(_('Illegal IP version number'))
def is_port_trusted(port):
"""Used to determine if port can be trusted not to attack network.
Trust is currently based on the device_owner field starting with 'network:'
since we restrict who can use that in the default policy.json file.
"""
return port['device_owner'].startswith('network:')
class DelayedStringRenderer(object):
"""Takes a callable and its args and calls when __str__ is called

View File

@ -13,6 +13,7 @@
# under the License.
from neutron.api.v2 import attributes as attrs
from neutron.common import utils
from neutron.db import db_base_plugin_v2
from neutron.db import portsecurity_db_common
from neutron.extensions import portsecurity as psec
@ -40,8 +41,7 @@ class PortSecurityDbMixin(portsecurity_db_common.PortSecurityDbCommon):
"""
has_ip = self._ip_on_port(port)
# we don't apply security groups for dhcp, router
if (port.get('device_owner') and
port['device_owner'].startswith('network:')):
if port.get('device_owner') and utils.is_port_trusted(port):
return (False, has_ip)
if attrs.is_attr_set(port.get(psec.PORTSECURITY)):

View File

@ -686,15 +686,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
:returns: all security groups IDs on port belonging to tenant.
"""
p = port['port']
if not attributes.is_attr_set(p.get(ext_sg.SECURITYGROUPS)):
port = port['port']
if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
return
if p.get('device_owner') and p['device_owner'].startswith('network:'):
if port.get('device_owner') and utils.is_port_trusted(port):
return
port_sg = p.get(ext_sg.SECURITYGROUPS, [])
port_sg = port.get(ext_sg.SECURITYGROUPS, [])
filters = {'id': port_sg}
tenant_id = p.get('tenant_id')
tenant_id = port.get('tenant_id')
if tenant_id:
filters['tenant_id'] = [tenant_id]
valid_groups = set(g['id'] for g in
@ -710,14 +710,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _ensure_default_security_group_on_port(self, context, port):
# we don't apply security groups for dhcp, router
if (port['port'].get('device_owner') and
port['port']['device_owner'].startswith('network:')):
port = port['port']
if port.get('device_owner') and utils.is_port_trusted(port):
return
tenant_id = self._get_tenant_id_for_create(context,
port['port'])
tenant_id = self._get_tenant_id_for_create(context, port)
default_sg = self._ensure_default_security_group(context, tenant_id)
if not attributes.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
port[ext_sg.SECURITYGROUPS] = [default_sg]
def _check_update_deletes_security_groups(self, port):
"""Return True if port has as a security group and it's value

View File

@ -18,6 +18,7 @@ from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.i18n import _LI
LOG = logging.getLogger(__name__)
@ -32,7 +33,7 @@ def setup_arp_spoofing_protection(vif, port_details):
LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
"it has port security disabled"), vif)
return
if port_details['device_owner'].startswith('network:'):
if utils.is_port_trusted(port_details):
# clear any previous entries related to this port
delete_arp_spoofing_protection([vif], current_rules)
LOG.debug("Skipping ARP spoofing rules for network owned port "

View File

@ -14,6 +14,7 @@
# under the License.
from neutron.api.v2 import attributes as attrs
from neutron.common import utils
from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as ps_db_common
from neutron.extensions import portsecurity as psec
@ -80,8 +81,7 @@ class PortSecurityExtensionDriver(api.ExtensionDriver,
otherwise the value associated with the network is returned.
"""
# we don't apply security groups for dhcp, router
if (port.get('device_owner') and
port['device_owner'].startswith('network:')):
if port.get('device_owner') and utils.is_port_trusted(port):
return False
if attrs.is_attr_set(port.get(psec.PORTSECURITY)):