Add devices to update in RPC call security_groups_provider_updated
When a security_groups_provider_updated is received then a global refresh of the firewall is performed. This can be avoided if the plugins pass as parameter of the call the devices that belongs to the network updated. Partially-Implements: blueprint restructure-l2-agent Change-Id: I1e78f3a5ec7e5c5bcba338a0097566422411ef7e
This commit is contained in:
parent
c5188d8bf8
commit
4e71c48bbf
|
@ -202,15 +202,15 @@ class SecurityGroupAgentRpc(object):
|
|||
else:
|
||||
self.refresh_firewall(devices)
|
||||
|
||||
def security_groups_provider_updated(self):
|
||||
def security_groups_provider_updated(self, devices_to_update):
|
||||
LOG.info(_LI("Provider rule updated"))
|
||||
if self.defer_refresh_firewall:
|
||||
# NOTE(salv-orlando): A 'global refresh' might not be
|
||||
# necessary if the subnet for which the provider rules
|
||||
# were updated is known
|
||||
self.global_refresh_firewall = True
|
||||
if devices_to_update is None:
|
||||
self.global_refresh_firewall = True
|
||||
else:
|
||||
self.devices_to_refilter |= set(devices_to_update)
|
||||
else:
|
||||
self.refresh_firewall()
|
||||
self.refresh_firewall(devices_to_update)
|
||||
|
||||
def remove_devices_filter(self, device_ids):
|
||||
if not device_ids:
|
||||
|
|
|
@ -153,12 +153,14 @@ class SecurityGroupAgentRpcApiMixin(object):
|
|||
cctxt.cast(context, 'security_groups_member_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
def security_groups_provider_updated(self, context):
|
||||
def security_groups_provider_updated(self, context,
|
||||
devices_to_update=None):
|
||||
"""Notify provider updated security groups."""
|
||||
cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
|
||||
cctxt = self.client.prepare(version=1.3,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_provider_updated')
|
||||
cctxt.cast(context, 'security_groups_provider_updated',
|
||||
devices_to_update=devices_to_update)
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcCallbackMixin(object):
|
||||
|
@ -205,6 +207,7 @@ class SecurityGroupAgentRpcCallbackMixin(object):
|
|||
def security_groups_provider_updated(self, context, **kwargs):
|
||||
"""Callback for security group provider update."""
|
||||
LOG.debug("Provider rule updated")
|
||||
devices_to_update = kwargs.get('devices_to_update')
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_provider_updated()
|
||||
self.sg_agent.security_groups_provider_updated(devices_to_update)
|
||||
|
|
|
@ -147,22 +147,29 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
|
|||
occurs and the plugin agent fetches the update provider
|
||||
rule in the other RPC call (security_group_rules_for_devices).
|
||||
"""
|
||||
security_groups_provider_updated = False
|
||||
sg_provider_updated_networks = set()
|
||||
sec_groups = set()
|
||||
for port in ports:
|
||||
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
|
||||
security_groups_provider_updated = True
|
||||
sg_provider_updated_networks.add(
|
||||
port['network_id'])
|
||||
# For IPv6, provider rule need to be updated in case router
|
||||
# interface is created or updated after VM port is created.
|
||||
elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
|
||||
if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
|
||||
for fixed_ip in port['fixed_ips']):
|
||||
security_groups_provider_updated = True
|
||||
sg_provider_updated_networks.add(
|
||||
port['network_id'])
|
||||
else:
|
||||
sec_groups |= set(port.get(ext_sg.SECURITYGROUPS))
|
||||
|
||||
if security_groups_provider_updated:
|
||||
self.notifier.security_groups_provider_updated(context)
|
||||
if sg_provider_updated_networks:
|
||||
ports_query = context.session.query(models_v2.Port.id).filter(
|
||||
models_v2.Port.network_id.in_(
|
||||
sg_provider_updated_networks)).all()
|
||||
ports_to_update = [p.id for p in ports_query]
|
||||
self.notifier.security_groups_provider_updated(
|
||||
context, ports_to_update)
|
||||
if sec_groups:
|
||||
self.notifier.security_groups_member_updated(
|
||||
context, list(sec_groups))
|
||||
|
|
|
@ -637,7 +637,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
# Set RPC API version to 1.0 by default.
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
target = oslo_messaging.Target(version='1.1')
|
||||
# 1.3 Added param devices_to_update to security_groups_provider_updated
|
||||
target = oslo_messaging.Target(version='1.3')
|
||||
|
||||
def __init__(self, context, agent, sg_agent):
|
||||
super(LinuxBridgeRpcCallbacks, self).__init__()
|
||||
|
|
|
@ -117,7 +117,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
# 1.0 Initial version
|
||||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support DVR (Distributed Virtual Router) RPC
|
||||
target = oslo_messaging.Target(version='1.2')
|
||||
# 1.3 Added param devices_to_update to security_groups_provider_updated
|
||||
target = oslo_messaging.Target(version='1.3')
|
||||
|
||||
def __init__(self, integ_br, tun_br, local_ip,
|
||||
bridge_mappings, polling_interval, tunnel_types=None,
|
||||
|
|
|
@ -1186,9 +1186,9 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
|
|||
|
||||
def test_security_groups_provider_updated(self):
|
||||
self.agent.refresh_firewall = mock.Mock()
|
||||
self.agent.security_groups_provider_updated()
|
||||
self.agent.security_groups_provider_updated(None)
|
||||
self.agent.refresh_firewall.assert_has_calls(
|
||||
[mock.call.refresh_firewall()])
|
||||
[mock.call.refresh_firewall(None)])
|
||||
|
||||
def test_refresh_firewall(self):
|
||||
self.agent.prepare_devices_filter(['fake_port_id'])
|
||||
|
@ -1304,9 +1304,9 @@ class SecurityGroupAgentEnhancedRpcTestCase(
|
|||
|
||||
def test_security_groups_provider_updated_enhanced_rpc(self):
|
||||
self.agent.refresh_firewall = mock.Mock()
|
||||
self.agent.security_groups_provider_updated()
|
||||
self.agent.security_groups_provider_updated(None)
|
||||
self.agent.refresh_firewall.assert_has_calls(
|
||||
[mock.call.refresh_firewall()])
|
||||
[mock.call.refresh_firewall(None)])
|
||||
|
||||
def test_refresh_firewall_enhanced_rpc(self):
|
||||
self.agent.prepare_devices_filter(['fake_port_id'])
|
||||
|
@ -1438,9 +1438,16 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
|
|||
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
|
||||
|
||||
def test_security_groups_provider_updated(self):
|
||||
self.agent.security_groups_provider_updated()
|
||||
self.agent.security_groups_provider_updated(None)
|
||||
self.assertTrue(self.agent.global_refresh_firewall)
|
||||
|
||||
def test_security_groups_provider_updated_devices_specified(self):
|
||||
self.agent.security_groups_provider_updated(
|
||||
['fake_device_1', 'fake_device_2'])
|
||||
self.assertFalse(self.agent.global_refresh_firewall)
|
||||
self.assertIn('fake_device_1', self.agent.devices_to_refilter)
|
||||
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
|
||||
|
||||
def test_setup_port_filters_new_ports_only(self):
|
||||
self.agent.prepare_devices_filter = mock.Mock()
|
||||
self.agent.refresh_firewall = mock.Mock()
|
||||
|
@ -1593,7 +1600,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
|
|||
def test_security_groups_provider_updated(self):
|
||||
self.notifier.security_groups_provider_updated(None)
|
||||
self.mock_cast.assert_has_calls(
|
||||
[mock.call(None, 'security_groups_provider_updated')])
|
||||
[mock.call(None, 'security_groups_provider_updated',
|
||||
devices_to_update=None)])
|
||||
|
||||
def test_security_groups_rule_updated(self):
|
||||
self.notifier.security_groups_rule_updated(
|
||||
|
|
|
@ -61,4 +61,4 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
|
|||
def test_security_groups_provider_updated(self):
|
||||
self.rpc.security_groups_provider_updated(None)
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_provider_updated()])
|
||||
[mock.call.security_groups_provider_updated(None)])
|
||||
|
|
|
@ -468,6 +468,15 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
m_upd.assert_called_once_with(ctx, used_sg)
|
||||
self.assertFalse(p_upd.called)
|
||||
|
||||
def _check_security_groups_provider_updated_args(self, p_upd_mock, net_id):
|
||||
query_params = "network_id=%s" % net_id
|
||||
network_ports = self._list('ports', query_params=query_params)
|
||||
network_ports_ids = [port['id'] for port in network_ports['ports']]
|
||||
self.assertTrue(p_upd_mock.called)
|
||||
p_upd_args = p_upd_mock.call_args
|
||||
ports_ids = p_upd_args[0][1]
|
||||
self.assertEqual(sorted(network_ports_ids), sorted(ports_ids))
|
||||
|
||||
def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
|
||||
ctx = context.get_admin_context()
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
@ -496,15 +505,14 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
ports = self.deserialize(self.fmt, res)
|
||||
used_sg = ports['ports'][0]['security_groups']
|
||||
m_upd.assert_called_once_with(ctx, used_sg)
|
||||
p_upd.assert_called_once_with(ctx)
|
||||
|
||||
self._check_security_groups_provider_updated_args(p_upd, net_id)
|
||||
m_upd.reset_mock()
|
||||
p_upd.reset_mock()
|
||||
data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
|
||||
self._create_bulk_from_list(self.fmt, 'port',
|
||||
data, context=ctx)
|
||||
self.assertFalse(m_upd.called)
|
||||
p_upd.assert_called_once_with(ctx)
|
||||
self._check_security_groups_provider_updated_args(p_upd, net_id)
|
||||
|
||||
def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
|
||||
ctx = context.get_admin_context()
|
||||
|
@ -532,7 +540,8 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
self._create_bulk_from_list(self.fmt, 'port',
|
||||
data, context=ctx)
|
||||
self.assertFalse(m_upd.called)
|
||||
p_upd.assert_called_once_with(ctx)
|
||||
self._check_security_groups_provider_updated_args(
|
||||
p_upd, net_id)
|
||||
|
||||
def test_delete_port_no_notify_in_disassociate_floatingips(self):
|
||||
ctx = context.get_admin_context()
|
||||
|
|
Loading…
Reference in New Issue