From 020ea1479e3c2fac3e1ef99ed85f45792b286a0e Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Sun, 22 Jan 2017 17:04:41 -0800 Subject: [PATCH] Use push notification for security groups Calculate all security group info on the agent from the push notification cache. Partially-Implements: blueprint push-notifications Change-Id: I5c74ba17223a431dad924d31bbe08ad958de3877 --- .../api/rpc/handlers/securitygroups_rpc.py | 159 ++++++++++++++++++ .../openvswitch/agent/ovs_neutron_agent.py | 9 +- .../rpc/handlers/test_securitygroups_rpc.py | 98 +++++++++++ 3 files changed, 262 insertions(+), 4 deletions(-) diff --git a/neutron/api/rpc/handlers/securitygroups_rpc.py b/neutron/api/rpc/handlers/securitygroups_rpc.py index e3a32f027d6..ccd2488c398 100644 --- a/neutron/api/rpc/handlers/securitygroups_rpc.py +++ b/neutron/api/rpc/handlers/securitygroups_rpc.py @@ -12,15 +12,21 @@ # License for the specific language governing permissions and limitations # under the License. +import collections + from neutron_lib.plugins import directory from neutron_lib.utils import net from oslo_log import log as logging import oslo_messaging from neutron._i18n import _LW +from neutron.api.rpc.handlers import resources_rpc +from neutron.callbacks import events +from neutron.callbacks import registry from neutron.common import constants from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.db import securitygroups_rpc_base as sg_rpc_base LOG = logging.getLogger(__name__) @@ -221,3 +227,156 @@ class SecurityGroupAgentRpcCallbackMixin(object): generates the notification for agents running older versions that have IP-specific rules. """ + + +class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): + """Agent-side replacement for SecurityGroupServerRpcApi using local data. + + This provides the same methods as SecurityGroupServerRpcApi but it reads + from the updates delivered to the push notifications cache rather than + calling the server. + """ + def __init__(self, rcache): + self.rcache = rcache + registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup', + events.AFTER_DELETE) + registry.subscribe(self._add_child_sg_rules, 'SecurityGroup', + events.AFTER_UPDATE) + # set this attr so agent can adjust the timeout of the client + self.client = resources_rpc.ResourcesPullRpcApi().client + + def register_legacy_sg_notification_callbacks(self, sg_agent): + self._sg_agent = sg_agent + registry.subscribe(self._handle_sg_rule_delete, + 'SecurityGroupRule', events.AFTER_DELETE) + registry.subscribe(self._handle_sg_rule_update, + 'SecurityGroupRule', events.AFTER_UPDATE) + registry.subscribe(self._handle_sg_member_delete, + 'Port', events.AFTER_DELETE) + registry.subscribe(self._handle_sg_member_update, + 'Port', events.AFTER_UPDATE) + + def security_group_info_for_devices(self, context, devices): + ports = self._get_devices_info(context, devices) + result = self.security_group_info_for_ports(context, ports) + return result + + def security_group_rules_for_devices(self, context, devices): + # this is the legacy method that should never be called since + # security_group_info_for_devices will never throw an unsupported + # error. + raise NotImplementedError() + + def _add_child_sg_rules(self, rtype, event, trigger, context, updated, + **kwargs): + # whenever we receive a full security group, add all child rules + # because the server won't emit events for the individual rules on + # creation. + for rule in updated.rules: + self.rcache.record_resource_update(context, 'SecurityGroupRule', + rule) + + def _clear_child_sg_rules(self, rtype, event, trigger, context, existing, + **kwargs): + if not existing: + return + # the server can delete an entire security group without notifying + # about the security group rules. so we need to emulate a rule deletion + # when a security group is removed. + filters = {'security_group_id': (existing.id, )} + for rule in self.rcache.get_resources('SecurityGroupRule', filters): + self.rcache.record_resource_delete(context, 'SecurityGroupRule', + rule.id) + + def _handle_sg_rule_delete(self, rtype, event, trigger, context, existing, + **kwargs): + if not existing: + return + sg_id = existing.security_group_id + self._sg_agent.security_groups_rule_updated([sg_id]) + + def _handle_sg_rule_update(self, rtype, event, trigger, context, existing, + updated, **kwargs): + sg_id = updated.security_group_id + self._sg_agent.security_groups_rule_updated([sg_id]) + + def _handle_sg_member_delete(self, rtype, event, trigger, context, + existing, **kwargs): + # received on port delete + sgs = set(existing.security_group_ids) if existing else set() + if sgs: + self._sg_agent.security_groups_member_updated(sgs) + + def _handle_sg_member_update(self, rtype, event, trigger, context, + existing, updated, changed_fields, **kwargs): + # received on port update + sgs = set(existing.security_group_ids) if existing else set() + if not changed_fields.intersection({'security_group_ids', 'fixed_ips', + 'allowed_address_pairs'}): + # none of the relevant fields to SG calculations changed + return + sgs.update({sg_id for sg_id in updated.security_group_ids}) + if sgs: + self._sg_agent.security_groups_member_updated(sgs) + + def _get_devices_info(self, context, devices): + # NOTE(kevinbenton): this format is required by the sg code, it is + # defined in get_port_from_device and mimics + # make_port_dict_with_security_groups in ML2 db + result = {} + for device in devices: + ovo = self.rcache.get_resource_by_id('Port', device) + if not ovo: + continue + port = ovo.to_dict() + # the caller expects trusted ports to be excluded from the result + if net.is_port_trusted(port): + continue + + port['security_groups'] = list(ovo.security_group_ids) + port['security_group_rules'] = [] + port['security_group_source_groups'] = [] + port['fixed_ips'] = [str(f['ip_address']) + for f in port['fixed_ips']] + # NOTE(kevinbenton): this id==device is only safe for OVS. a lookup + # will be required for linux bridge and others that don't have the + # full port UUID + port['device'] = port['id'] + result[device] = port + return result + + def _select_ips_for_remote_group(self, context, remote_group_ids): + if not remote_group_ids: + return {} + ips_by_group = {rg: set() for rg in remote_group_ids} + + filters = {'security_group_ids': tuple(remote_group_ids)} + for p in self.rcache.get_resources('Port', filters): + port_ips = [str(addr.ip_address) + for addr in p.fixed_ips + p.allowed_address_pairs] + for sg_id in p.security_group_ids: + if sg_id in ips_by_group: + ips_by_group[sg_id].update(set(port_ips)) + return ips_by_group + + def _select_rules_for_ports(self, context, ports): + if not ports: + return [] + results = [] + sg_ids = set((sg_id for p in ports.values() + for sg_id in p['security_group_ids'])) + rules_by_sgid = collections.defaultdict(list) + for sg_id in sg_ids: + filters = {'security_group_id': (sg_id, )} + for r in self.rcache.get_resources('SecurityGroupRule', filters): + rules_by_sgid[r.security_group_id].append(r) + for p in ports.values(): + for sg_id in p['security_group_ids']: + for rule in rules_by_sgid[sg_id]: + results.append((p['id'], rule.to_dict())) + return results + + def _select_sg_ids_for_ports(self, context, ports): + sg_ids = set((sg_id for p in ports.values() + for sg_id in p['security_group_ids'])) + return [(sg_id, ) for sg_id in sg_ids] diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 2e137daa8e2..8633e090900 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -89,8 +89,7 @@ def has_zero_prefixlen_address(ip_addresses): @profiler.trace_cls("rpc") -class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, - l2population_rpc.L2populationRpcCallBackTunnelMixin, +class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, dvr_rpc.DVRAgentRpcCallbackMixin): '''Implements OVS-based tunneling, VLANs and flat networks. @@ -236,6 +235,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc( self.context, self.sg_plugin_rpc, defer_refresh_firewall=True, integration_bridge=self.int_br) + self.sg_plugin_rpc.register_legacy_sg_notification_callbacks( + self.sg_agent) # we default to False to provide backward compat with out of tree # firewall drivers that expect the logic that existed on the Neutron @@ -361,7 +362,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.plugin_rpc = OVSPluginApi(topics.PLUGIN) # allow us to receive port_update/delete callbacks from the cache self.plugin_rpc.register_legacy_notification_callbacks(self) - self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) + self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim( + self.plugin_rpc.remote_resource_cache) self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) @@ -370,7 +372,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # Define the listening consumers for the agent consumers = [[constants.TUNNEL, topics.UPDATE], [constants.TUNNEL, topics.DELETE], - [topics.SECURITY_GROUP, topics.UPDATE], [topics.DVR, topics.UPDATE]] if self.l2_pop: consumers.append([topics.L2POPULATION, topics.UPDATE]) diff --git a/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py index 688ef9b35dc..41d99adff67 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py @@ -13,8 +13,15 @@ # under the License. import mock +from neutron_lib import context +from oslo_utils import uuidutils +from neutron.agent import resource_cache +from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import securitygroups_rpc +from neutron import objects +from neutron.objects import ports +from neutron.objects import securitygroup from neutron.tests import base @@ -57,3 +64,94 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase): self.rpc.security_groups_provider_updated(None) # this is now a NOOP on the agent side. provider rules don't change self.assertFalse(self.rpc.sg_agent.called) + + +class SecurityGroupServerAPIShimTestCase(base.BaseTestCase): + + def setUp(self): + super(SecurityGroupServerAPIShimTestCase, self).setUp() + objects.register_objects() + resource_types = [resources.PORT, resources.SECURITYGROUP, + resources.SECURITYGROUPRULE] + self.rcache = resource_cache.RemoteResourceCache(resource_types) + # prevent any server lookup attempts + mock.patch.object(self.rcache, '_flood_cache_for_query').start() + self.shim = securitygroups_rpc.SecurityGroupServerAPIShim(self.rcache) + self.sg_agent = mock.Mock() + self.shim.register_legacy_sg_notification_callbacks(self.sg_agent) + self.ctx = context.get_admin_context() + + def _make_port_ovo(self, ip, **kwargs): + attrs = {'id': uuidutils.generate_uuid(), + 'network_id': uuidutils.generate_uuid(), + 'security_group_ids': set(), + 'device_owner': 'compute:None', + 'allowed_address_pairs': []} + attrs['fixed_ips'] = [ports.IPAllocation( + port_id=attrs['id'], subnet_id=uuidutils.generate_uuid(), + network_id=attrs['network_id'], ip_address=ip)] + attrs.update(**kwargs) + p = ports.Port(self.ctx, **attrs) + self.rcache.record_resource_update(self.ctx, 'Port', p) + return p + + def _make_security_group_ovo(self, **kwargs): + attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1} + sg_rule = securitygroup.SecurityGroupRule( + id=uuidutils.generate_uuid(), + security_group_id=attrs['id'], + direction='ingress', + ethertype='IPv4', protocol='tcp', + port_range_min=400, + remote_group_id=attrs['id'], + revision_number=1, + ) + attrs['rules'] = [sg_rule] + attrs.update(**kwargs) + sg = securitygroup.SecurityGroup(self.ctx, **attrs) + self.rcache.record_resource_update(self.ctx, 'SecurityGroup', sg) + return sg + + def test_sg_parent_ops_affect_rules(self): + s1 = self._make_security_group_ovo() + filters = {'security_group_id': (s1.id, )} + self.assertEqual( + s1.rules, + self.rcache.get_resources('SecurityGroupRule', filters)) + self.sg_agent.security_groups_rule_updated.assert_called_once_with( + [s1.id]) + self.sg_agent.security_groups_rule_updated.reset_mock() + self.rcache.record_resource_delete(self.ctx, 'SecurityGroup', s1.id) + self.assertEqual( + [], + self.rcache.get_resources('SecurityGroupRule', filters)) + self.sg_agent.security_groups_rule_updated.assert_called_once_with( + [s1.id]) + + def test_security_group_info_for_devices(self): + s1 = self._make_security_group_ovo() + p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id}) + p2 = self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id}) + p3 = self._make_port_ovo(ip='3.3.3.3', security_group_ids={s1.id}, + device_owner='network:dhcp') + ids = [p1.id, p2.id, p3.id] + info = self.shim.security_group_info_for_devices(self.ctx, ids) + self.assertIn('1.1.1.1', info['sg_member_ips'][s1.id]['IPv4']) + self.assertIn('2.2.2.2', info['sg_member_ips'][s1.id]['IPv4']) + self.assertIn('3.3.3.3', info['sg_member_ips'][s1.id]['IPv4']) + self.assertIn(p1.id, info['devices'].keys()) + self.assertIn(p2.id, info['devices'].keys()) + # P3 is a trusted port so it doesn't have rules + self.assertNotIn(p3.id, info['devices'].keys()) + self.assertEqual([s1.id], list(info['security_groups'].keys())) + + def test_sg_member_update_events(self): + s1 = self._make_security_group_ovo() + p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id}) + self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id}) + self.sg_agent.security_groups_member_updated.assert_called_with( + {s1.id}) + self.sg_agent.security_groups_member_updated.reset_mock() + self.rcache.record_resource_delete(self.ctx, 'Port', p1.id) + self.sg_agent.security_groups_member_updated.assert_called_with( + {s1.id})