Use push notification for security groups

Calculate all security group info on the agent from
the push notification cache.

Partially-Implements: blueprint push-notifications
Change-Id: I5c74ba17223a431dad924d31bbe08ad958de3877
This commit is contained in:
Kevin Benton 2017-01-22 17:04:41 -08:00
parent 715be3e3b1
commit 020ea1479e
3 changed files with 262 additions and 4 deletions

View File

@ -12,15 +12,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib.plugins import directory
from neutron_lib.utils import net
from oslo_log import log as logging
import oslo_messaging
from neutron._i18n import _LW
from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import securitygroups_rpc_base as sg_rpc_base
LOG = logging.getLogger(__name__)
@ -221,3 +227,156 @@ class SecurityGroupAgentRpcCallbackMixin(object):
generates the notification for agents running older versions that have
IP-specific rules.
"""
class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
"""Agent-side replacement for SecurityGroupServerRpcApi using local data.
This provides the same methods as SecurityGroupServerRpcApi but it reads
from the updates delivered to the push notifications cache rather than
calling the server.
"""
def __init__(self, rcache):
self.rcache = rcache
registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup',
events.AFTER_DELETE)
registry.subscribe(self._add_child_sg_rules, 'SecurityGroup',
events.AFTER_UPDATE)
# set this attr so agent can adjust the timeout of the client
self.client = resources_rpc.ResourcesPullRpcApi().client
def register_legacy_sg_notification_callbacks(self, sg_agent):
self._sg_agent = sg_agent
registry.subscribe(self._handle_sg_rule_delete,
'SecurityGroupRule', events.AFTER_DELETE)
registry.subscribe(self._handle_sg_rule_update,
'SecurityGroupRule', events.AFTER_UPDATE)
registry.subscribe(self._handle_sg_member_delete,
'Port', events.AFTER_DELETE)
registry.subscribe(self._handle_sg_member_update,
'Port', events.AFTER_UPDATE)
def security_group_info_for_devices(self, context, devices):
ports = self._get_devices_info(context, devices)
result = self.security_group_info_for_ports(context, ports)
return result
def security_group_rules_for_devices(self, context, devices):
# this is the legacy method that should never be called since
# security_group_info_for_devices will never throw an unsupported
# error.
raise NotImplementedError()
def _add_child_sg_rules(self, rtype, event, trigger, context, updated,
**kwargs):
# whenever we receive a full security group, add all child rules
# because the server won't emit events for the individual rules on
# creation.
for rule in updated.rules:
self.rcache.record_resource_update(context, 'SecurityGroupRule',
rule)
def _clear_child_sg_rules(self, rtype, event, trigger, context, existing,
**kwargs):
if not existing:
return
# the server can delete an entire security group without notifying
# about the security group rules. so we need to emulate a rule deletion
# when a security group is removed.
filters = {'security_group_id': (existing.id, )}
for rule in self.rcache.get_resources('SecurityGroupRule', filters):
self.rcache.record_resource_delete(context, 'SecurityGroupRule',
rule.id)
def _handle_sg_rule_delete(self, rtype, event, trigger, context, existing,
**kwargs):
if not existing:
return
sg_id = existing.security_group_id
self._sg_agent.security_groups_rule_updated([sg_id])
def _handle_sg_rule_update(self, rtype, event, trigger, context, existing,
updated, **kwargs):
sg_id = updated.security_group_id
self._sg_agent.security_groups_rule_updated([sg_id])
def _handle_sg_member_delete(self, rtype, event, trigger, context,
existing, **kwargs):
# received on port delete
sgs = set(existing.security_group_ids) if existing else set()
if sgs:
self._sg_agent.security_groups_member_updated(sgs)
def _handle_sg_member_update(self, rtype, event, trigger, context,
existing, updated, changed_fields, **kwargs):
# received on port update
sgs = set(existing.security_group_ids) if existing else set()
if not changed_fields.intersection({'security_group_ids', 'fixed_ips',
'allowed_address_pairs'}):
# none of the relevant fields to SG calculations changed
return
sgs.update({sg_id for sg_id in updated.security_group_ids})
if sgs:
self._sg_agent.security_groups_member_updated(sgs)
def _get_devices_info(self, context, devices):
# NOTE(kevinbenton): this format is required by the sg code, it is
# defined in get_port_from_device and mimics
# make_port_dict_with_security_groups in ML2 db
result = {}
for device in devices:
ovo = self.rcache.get_resource_by_id('Port', device)
if not ovo:
continue
port = ovo.to_dict()
# the caller expects trusted ports to be excluded from the result
if net.is_port_trusted(port):
continue
port['security_groups'] = list(ovo.security_group_ids)
port['security_group_rules'] = []
port['security_group_source_groups'] = []
port['fixed_ips'] = [str(f['ip_address'])
for f in port['fixed_ips']]
# NOTE(kevinbenton): this id==device is only safe for OVS. a lookup
# will be required for linux bridge and others that don't have the
# full port UUID
port['device'] = port['id']
result[device] = port
return result
def _select_ips_for_remote_group(self, context, remote_group_ids):
if not remote_group_ids:
return {}
ips_by_group = {rg: set() for rg in remote_group_ids}
filters = {'security_group_ids': tuple(remote_group_ids)}
for p in self.rcache.get_resources('Port', filters):
port_ips = [str(addr.ip_address)
for addr in p.fixed_ips + p.allowed_address_pairs]
for sg_id in p.security_group_ids:
if sg_id in ips_by_group:
ips_by_group[sg_id].update(set(port_ips))
return ips_by_group
def _select_rules_for_ports(self, context, ports):
if not ports:
return []
results = []
sg_ids = set((sg_id for p in ports.values()
for sg_id in p['security_group_ids']))
rules_by_sgid = collections.defaultdict(list)
for sg_id in sg_ids:
filters = {'security_group_id': (sg_id, )}
for r in self.rcache.get_resources('SecurityGroupRule', filters):
rules_by_sgid[r.security_group_id].append(r)
for p in ports.values():
for sg_id in p['security_group_ids']:
for rule in rules_by_sgid[sg_id]:
results.append((p['id'], rule.to_dict()))
return results
def _select_sg_ids_for_ports(self, context, ports):
sg_ids = set((sg_id for p in ports.values()
for sg_id in p['security_group_ids']))
return [(sg_id, ) for sg_id in sg_ids]

View File

@ -89,8 +89,7 @@ def has_zero_prefixlen_address(ip_addresses):
@profiler.trace_cls("rpc")
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2population_rpc.L2populationRpcCallBackTunnelMixin,
class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
dvr_rpc.DVRAgentRpcCallbackMixin):
'''Implements OVS-based tunneling, VLANs and flat networks.
@ -236,6 +235,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
integration_bridge=self.int_br)
self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
self.sg_agent)
# we default to False to provide backward compat with out of tree
# firewall drivers that expect the logic that existed on the Neutron
@ -361,7 +362,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# allow us to receive port_update/delete callbacks from the cache
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
self.plugin_rpc.remote_resource_cache)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
@ -370,7 +372,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# Define the listening consumers for the agent
consumers = [[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])

View File

@ -13,8 +13,15 @@
# under the License.
import mock
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron import objects
from neutron.objects import ports
from neutron.objects import securitygroup
from neutron.tests import base
@ -57,3 +64,94 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
self.rpc.security_groups_provider_updated(None)
# this is now a NOOP on the agent side. provider rules don't change
self.assertFalse(self.rpc.sg_agent.called)
class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
def setUp(self):
super(SecurityGroupServerAPIShimTestCase, self).setUp()
objects.register_objects()
resource_types = [resources.PORT, resources.SECURITYGROUP,
resources.SECURITYGROUPRULE]
self.rcache = resource_cache.RemoteResourceCache(resource_types)
# prevent any server lookup attempts
mock.patch.object(self.rcache, '_flood_cache_for_query').start()
self.shim = securitygroups_rpc.SecurityGroupServerAPIShim(self.rcache)
self.sg_agent = mock.Mock()
self.shim.register_legacy_sg_notification_callbacks(self.sg_agent)
self.ctx = context.get_admin_context()
def _make_port_ovo(self, ip, **kwargs):
attrs = {'id': uuidutils.generate_uuid(),
'network_id': uuidutils.generate_uuid(),
'security_group_ids': set(),
'device_owner': 'compute:None',
'allowed_address_pairs': []}
attrs['fixed_ips'] = [ports.IPAllocation(
port_id=attrs['id'], subnet_id=uuidutils.generate_uuid(),
network_id=attrs['network_id'], ip_address=ip)]
attrs.update(**kwargs)
p = ports.Port(self.ctx, **attrs)
self.rcache.record_resource_update(self.ctx, 'Port', p)
return p
def _make_security_group_ovo(self, **kwargs):
attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1}
sg_rule = securitygroup.SecurityGroupRule(
id=uuidutils.generate_uuid(),
security_group_id=attrs['id'],
direction='ingress',
ethertype='IPv4', protocol='tcp',
port_range_min=400,
remote_group_id=attrs['id'],
revision_number=1,
)
attrs['rules'] = [sg_rule]
attrs.update(**kwargs)
sg = securitygroup.SecurityGroup(self.ctx, **attrs)
self.rcache.record_resource_update(self.ctx, 'SecurityGroup', sg)
return sg
def test_sg_parent_ops_affect_rules(self):
s1 = self._make_security_group_ovo()
filters = {'security_group_id': (s1.id, )}
self.assertEqual(
s1.rules,
self.rcache.get_resources('SecurityGroupRule', filters))
self.sg_agent.security_groups_rule_updated.assert_called_once_with(
[s1.id])
self.sg_agent.security_groups_rule_updated.reset_mock()
self.rcache.record_resource_delete(self.ctx, 'SecurityGroup', s1.id)
self.assertEqual(
[],
self.rcache.get_resources('SecurityGroupRule', filters))
self.sg_agent.security_groups_rule_updated.assert_called_once_with(
[s1.id])
def test_security_group_info_for_devices(self):
s1 = self._make_security_group_ovo()
p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id})
p2 = self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id})
p3 = self._make_port_ovo(ip='3.3.3.3', security_group_ids={s1.id},
device_owner='network:dhcp')
ids = [p1.id, p2.id, p3.id]
info = self.shim.security_group_info_for_devices(self.ctx, ids)
self.assertIn('1.1.1.1', info['sg_member_ips'][s1.id]['IPv4'])
self.assertIn('2.2.2.2', info['sg_member_ips'][s1.id]['IPv4'])
self.assertIn('3.3.3.3', info['sg_member_ips'][s1.id]['IPv4'])
self.assertIn(p1.id, info['devices'].keys())
self.assertIn(p2.id, info['devices'].keys())
# P3 is a trusted port so it doesn't have rules
self.assertNotIn(p3.id, info['devices'].keys())
self.assertEqual([s1.id], list(info['security_groups'].keys()))
def test_sg_member_update_events(self):
s1 = self._make_security_group_ovo()
p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id})
self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id})
self.sg_agent.security_groups_member_updated.assert_called_with(
{s1.id})
self.sg_agent.security_groups_member_updated.reset_mock()
self.rcache.record_resource_delete(self.ctx, 'Port', p1.id)
self.sg_agent.security_groups_member_updated.assert_called_with(
{s1.id})