Merge "Use push notification for security groups"
This commit is contained in:
commit
a5180a868b
@ -12,15 +12,21 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib.utils import net
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
from neutron._i18n import _LW
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import securitygroups_rpc_base as sg_rpc_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -221,3 +227,156 @@ class SecurityGroupAgentRpcCallbackMixin(object):
|
||||
generates the notification for agents running older versions that have
|
||||
IP-specific rules.
|
||||
"""
|
||||
|
||||
|
||||
class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
|
||||
"""Agent-side replacement for SecurityGroupServerRpcApi using local data.
|
||||
|
||||
This provides the same methods as SecurityGroupServerRpcApi but it reads
|
||||
from the updates delivered to the push notifications cache rather than
|
||||
calling the server.
|
||||
"""
|
||||
def __init__(self, rcache):
|
||||
self.rcache = rcache
|
||||
registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup',
|
||||
events.AFTER_DELETE)
|
||||
registry.subscribe(self._add_child_sg_rules, 'SecurityGroup',
|
||||
events.AFTER_UPDATE)
|
||||
# set this attr so agent can adjust the timeout of the client
|
||||
self.client = resources_rpc.ResourcesPullRpcApi().client
|
||||
|
||||
def register_legacy_sg_notification_callbacks(self, sg_agent):
|
||||
self._sg_agent = sg_agent
|
||||
registry.subscribe(self._handle_sg_rule_delete,
|
||||
'SecurityGroupRule', events.AFTER_DELETE)
|
||||
registry.subscribe(self._handle_sg_rule_update,
|
||||
'SecurityGroupRule', events.AFTER_UPDATE)
|
||||
registry.subscribe(self._handle_sg_member_delete,
|
||||
'Port', events.AFTER_DELETE)
|
||||
registry.subscribe(self._handle_sg_member_update,
|
||||
'Port', events.AFTER_UPDATE)
|
||||
|
||||
def security_group_info_for_devices(self, context, devices):
|
||||
ports = self._get_devices_info(context, devices)
|
||||
result = self.security_group_info_for_ports(context, ports)
|
||||
return result
|
||||
|
||||
def security_group_rules_for_devices(self, context, devices):
|
||||
# this is the legacy method that should never be called since
|
||||
# security_group_info_for_devices will never throw an unsupported
|
||||
# error.
|
||||
raise NotImplementedError()
|
||||
|
||||
def _add_child_sg_rules(self, rtype, event, trigger, context, updated,
|
||||
**kwargs):
|
||||
# whenever we receive a full security group, add all child rules
|
||||
# because the server won't emit events for the individual rules on
|
||||
# creation.
|
||||
for rule in updated.rules:
|
||||
self.rcache.record_resource_update(context, 'SecurityGroupRule',
|
||||
rule)
|
||||
|
||||
def _clear_child_sg_rules(self, rtype, event, trigger, context, existing,
|
||||
**kwargs):
|
||||
if not existing:
|
||||
return
|
||||
# the server can delete an entire security group without notifying
|
||||
# about the security group rules. so we need to emulate a rule deletion
|
||||
# when a security group is removed.
|
||||
filters = {'security_group_id': (existing.id, )}
|
||||
for rule in self.rcache.get_resources('SecurityGroupRule', filters):
|
||||
self.rcache.record_resource_delete(context, 'SecurityGroupRule',
|
||||
rule.id)
|
||||
|
||||
def _handle_sg_rule_delete(self, rtype, event, trigger, context, existing,
|
||||
**kwargs):
|
||||
if not existing:
|
||||
return
|
||||
sg_id = existing.security_group_id
|
||||
self._sg_agent.security_groups_rule_updated([sg_id])
|
||||
|
||||
def _handle_sg_rule_update(self, rtype, event, trigger, context, existing,
|
||||
updated, **kwargs):
|
||||
sg_id = updated.security_group_id
|
||||
self._sg_agent.security_groups_rule_updated([sg_id])
|
||||
|
||||
def _handle_sg_member_delete(self, rtype, event, trigger, context,
|
||||
existing, **kwargs):
|
||||
# received on port delete
|
||||
sgs = set(existing.security_group_ids) if existing else set()
|
||||
if sgs:
|
||||
self._sg_agent.security_groups_member_updated(sgs)
|
||||
|
||||
def _handle_sg_member_update(self, rtype, event, trigger, context,
|
||||
existing, updated, changed_fields, **kwargs):
|
||||
# received on port update
|
||||
sgs = set(existing.security_group_ids) if existing else set()
|
||||
if not changed_fields.intersection({'security_group_ids', 'fixed_ips',
|
||||
'allowed_address_pairs'}):
|
||||
# none of the relevant fields to SG calculations changed
|
||||
return
|
||||
sgs.update({sg_id for sg_id in updated.security_group_ids})
|
||||
if sgs:
|
||||
self._sg_agent.security_groups_member_updated(sgs)
|
||||
|
||||
def _get_devices_info(self, context, devices):
|
||||
# NOTE(kevinbenton): this format is required by the sg code, it is
|
||||
# defined in get_port_from_device and mimics
|
||||
# make_port_dict_with_security_groups in ML2 db
|
||||
result = {}
|
||||
for device in devices:
|
||||
ovo = self.rcache.get_resource_by_id('Port', device)
|
||||
if not ovo:
|
||||
continue
|
||||
port = ovo.to_dict()
|
||||
# the caller expects trusted ports to be excluded from the result
|
||||
if net.is_port_trusted(port):
|
||||
continue
|
||||
|
||||
port['security_groups'] = list(ovo.security_group_ids)
|
||||
port['security_group_rules'] = []
|
||||
port['security_group_source_groups'] = []
|
||||
port['fixed_ips'] = [str(f['ip_address'])
|
||||
for f in port['fixed_ips']]
|
||||
# NOTE(kevinbenton): this id==device is only safe for OVS. a lookup
|
||||
# will be required for linux bridge and others that don't have the
|
||||
# full port UUID
|
||||
port['device'] = port['id']
|
||||
result[device] = port
|
||||
return result
|
||||
|
||||
def _select_ips_for_remote_group(self, context, remote_group_ids):
|
||||
if not remote_group_ids:
|
||||
return {}
|
||||
ips_by_group = {rg: set() for rg in remote_group_ids}
|
||||
|
||||
filters = {'security_group_ids': tuple(remote_group_ids)}
|
||||
for p in self.rcache.get_resources('Port', filters):
|
||||
port_ips = [str(addr.ip_address)
|
||||
for addr in p.fixed_ips + p.allowed_address_pairs]
|
||||
for sg_id in p.security_group_ids:
|
||||
if sg_id in ips_by_group:
|
||||
ips_by_group[sg_id].update(set(port_ips))
|
||||
return ips_by_group
|
||||
|
||||
def _select_rules_for_ports(self, context, ports):
|
||||
if not ports:
|
||||
return []
|
||||
results = []
|
||||
sg_ids = set((sg_id for p in ports.values()
|
||||
for sg_id in p['security_group_ids']))
|
||||
rules_by_sgid = collections.defaultdict(list)
|
||||
for sg_id in sg_ids:
|
||||
filters = {'security_group_id': (sg_id, )}
|
||||
for r in self.rcache.get_resources('SecurityGroupRule', filters):
|
||||
rules_by_sgid[r.security_group_id].append(r)
|
||||
for p in ports.values():
|
||||
for sg_id in p['security_group_ids']:
|
||||
for rule in rules_by_sgid[sg_id]:
|
||||
results.append((p['id'], rule.to_dict()))
|
||||
return results
|
||||
|
||||
def _select_sg_ids_for_ports(self, context, ports):
|
||||
sg_ids = set((sg_id for p in ports.values()
|
||||
for sg_id in p['security_group_ids']))
|
||||
return [(sg_id, ) for sg_id in sg_ids]
|
||||
|
@ -89,8 +89,7 @@ def has_zero_prefixlen_address(ip_addresses):
|
||||
|
||||
|
||||
@profiler.trace_cls("rpc")
|
||||
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
dvr_rpc.DVRAgentRpcCallbackMixin):
|
||||
'''Implements OVS-based tunneling, VLANs and flat networks.
|
||||
|
||||
@ -236,6 +235,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
|
||||
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
|
||||
integration_bridge=self.int_br)
|
||||
self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
|
||||
self.sg_agent)
|
||||
|
||||
# we default to False to provide backward compat with out of tree
|
||||
# firewall drivers that expect the logic that existed on the Neutron
|
||||
@ -361,7 +362,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
|
||||
# allow us to receive port_update/delete callbacks from the cache
|
||||
self.plugin_rpc.register_legacy_notification_callbacks(self)
|
||||
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
|
||||
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
|
||||
self.plugin_rpc.remote_resource_cache)
|
||||
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
|
||||
|
||||
@ -370,7 +372,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[constants.TUNNEL, topics.UPDATE],
|
||||
[constants.TUNNEL, topics.DELETE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE],
|
||||
[topics.DVR, topics.UPDATE]]
|
||||
if self.l2_pop:
|
||||
consumers.append([topics.L2POPULATION, topics.UPDATE])
|
||||
|
@ -13,8 +13,15 @@
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from neutron_lib import context
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent import resource_cache
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc
|
||||
from neutron import objects
|
||||
from neutron.objects import ports
|
||||
from neutron.objects import securitygroup
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
@ -57,3 +64,94 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
|
||||
self.rpc.security_groups_provider_updated(None)
|
||||
# this is now a NOOP on the agent side. provider rules don't change
|
||||
self.assertFalse(self.rpc.sg_agent.called)
|
||||
|
||||
|
||||
class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SecurityGroupServerAPIShimTestCase, self).setUp()
|
||||
objects.register_objects()
|
||||
resource_types = [resources.PORT, resources.SECURITYGROUP,
|
||||
resources.SECURITYGROUPRULE]
|
||||
self.rcache = resource_cache.RemoteResourceCache(resource_types)
|
||||
# prevent any server lookup attempts
|
||||
mock.patch.object(self.rcache, '_flood_cache_for_query').start()
|
||||
self.shim = securitygroups_rpc.SecurityGroupServerAPIShim(self.rcache)
|
||||
self.sg_agent = mock.Mock()
|
||||
self.shim.register_legacy_sg_notification_callbacks(self.sg_agent)
|
||||
self.ctx = context.get_admin_context()
|
||||
|
||||
def _make_port_ovo(self, ip, **kwargs):
|
||||
attrs = {'id': uuidutils.generate_uuid(),
|
||||
'network_id': uuidutils.generate_uuid(),
|
||||
'security_group_ids': set(),
|
||||
'device_owner': 'compute:None',
|
||||
'allowed_address_pairs': []}
|
||||
attrs['fixed_ips'] = [ports.IPAllocation(
|
||||
port_id=attrs['id'], subnet_id=uuidutils.generate_uuid(),
|
||||
network_id=attrs['network_id'], ip_address=ip)]
|
||||
attrs.update(**kwargs)
|
||||
p = ports.Port(self.ctx, **attrs)
|
||||
self.rcache.record_resource_update(self.ctx, 'Port', p)
|
||||
return p
|
||||
|
||||
def _make_security_group_ovo(self, **kwargs):
|
||||
attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1}
|
||||
sg_rule = securitygroup.SecurityGroupRule(
|
||||
id=uuidutils.generate_uuid(),
|
||||
security_group_id=attrs['id'],
|
||||
direction='ingress',
|
||||
ethertype='IPv4', protocol='tcp',
|
||||
port_range_min=400,
|
||||
remote_group_id=attrs['id'],
|
||||
revision_number=1,
|
||||
)
|
||||
attrs['rules'] = [sg_rule]
|
||||
attrs.update(**kwargs)
|
||||
sg = securitygroup.SecurityGroup(self.ctx, **attrs)
|
||||
self.rcache.record_resource_update(self.ctx, 'SecurityGroup', sg)
|
||||
return sg
|
||||
|
||||
def test_sg_parent_ops_affect_rules(self):
|
||||
s1 = self._make_security_group_ovo()
|
||||
filters = {'security_group_id': (s1.id, )}
|
||||
self.assertEqual(
|
||||
s1.rules,
|
||||
self.rcache.get_resources('SecurityGroupRule', filters))
|
||||
self.sg_agent.security_groups_rule_updated.assert_called_once_with(
|
||||
[s1.id])
|
||||
self.sg_agent.security_groups_rule_updated.reset_mock()
|
||||
self.rcache.record_resource_delete(self.ctx, 'SecurityGroup', s1.id)
|
||||
self.assertEqual(
|
||||
[],
|
||||
self.rcache.get_resources('SecurityGroupRule', filters))
|
||||
self.sg_agent.security_groups_rule_updated.assert_called_once_with(
|
||||
[s1.id])
|
||||
|
||||
def test_security_group_info_for_devices(self):
|
||||
s1 = self._make_security_group_ovo()
|
||||
p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id})
|
||||
p2 = self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id})
|
||||
p3 = self._make_port_ovo(ip='3.3.3.3', security_group_ids={s1.id},
|
||||
device_owner='network:dhcp')
|
||||
ids = [p1.id, p2.id, p3.id]
|
||||
info = self.shim.security_group_info_for_devices(self.ctx, ids)
|
||||
self.assertIn('1.1.1.1', info['sg_member_ips'][s1.id]['IPv4'])
|
||||
self.assertIn('2.2.2.2', info['sg_member_ips'][s1.id]['IPv4'])
|
||||
self.assertIn('3.3.3.3', info['sg_member_ips'][s1.id]['IPv4'])
|
||||
self.assertIn(p1.id, info['devices'].keys())
|
||||
self.assertIn(p2.id, info['devices'].keys())
|
||||
# P3 is a trusted port so it doesn't have rules
|
||||
self.assertNotIn(p3.id, info['devices'].keys())
|
||||
self.assertEqual([s1.id], list(info['security_groups'].keys()))
|
||||
|
||||
def test_sg_member_update_events(self):
|
||||
s1 = self._make_security_group_ovo()
|
||||
p1 = self._make_port_ovo(ip='1.1.1.1', security_group_ids={s1.id})
|
||||
self._make_port_ovo(ip='2.2.2.2', security_group_ids={s1.id})
|
||||
self.sg_agent.security_groups_member_updated.assert_called_with(
|
||||
{s1.id})
|
||||
self.sg_agent.security_groups_member_updated.reset_mock()
|
||||
self.rcache.record_resource_delete(self.ctx, 'Port', p1.id)
|
||||
self.sg_agent.security_groups_member_updated.assert_called_with(
|
||||
{s1.id})
|
||||
|
Loading…
x
Reference in New Issue
Block a user