Merge "Remove SG RPC "use_enhanced_rpc" check."

This commit is contained in:
Zuul 2021-07-07 23:05:57 +00:00 committed by Gerrit Code Review
commit f12df766d5
2 changed files with 76 additions and 87 deletions

View File

@ -25,7 +25,6 @@ from neutron_lib.api.definitions import stateful_security_group as stateful_sg
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from neutron.agent import firewall
from neutron.common import _constants as common_constants
@ -130,26 +129,6 @@ class SecurityGroupAgentRpc(object):
# Stores devices for which firewall should be refreshed when
# deferred refresh is enabled.
self.devices_to_refilter = set()
self._use_enhanced_rpc = None
@property
def use_enhanced_rpc(self):
if self._use_enhanced_rpc is None:
self._use_enhanced_rpc = (
self._check_enhanced_rpc_is_supported_by_server())
return self._use_enhanced_rpc
def _check_enhanced_rpc_is_supported_by_server(self):
try:
self.plugin_rpc.security_group_info_for_devices(
self.context, devices=[])
except oslo_messaging.UnsupportedVersion:
LOG.warning('security_group_info_for_devices rpc call not '
'supported by the server, falling back to old '
'security_group_rules_for_devices which scales '
'worse.')
return False
return True
def skip_if_noopfirewall_or_firewall_disabled(func):
@functools.wraps(func)
@ -177,27 +156,22 @@ class SecurityGroupAgentRpc(object):
@_port_filter_lock
def _apply_port_filter(self, device_ids, update_filter=False):
step = common_constants.AGENT_RES_PROCESSING_STEP
if self.use_enhanced_rpc:
devices = {}
security_groups = {}
security_group_member_ips = {}
for i in range(0, len(device_ids), step):
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.context, list(device_ids)[i:i + step])
devices.update(devices_info['devices'])
security_groups.update(devices_info['security_groups'])
security_group_member_ips.update(devices_info['sg_member_ips'])
else:
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, list(device_ids))
devices = {}
security_groups = {}
security_group_member_ips = {}
for i in range(0, len(device_ids), step):
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.context, list(device_ids)[i:i + step])
devices.update(devices_info['devices'])
security_groups.update(devices_info['security_groups'])
security_group_member_ips.update(devices_info['sg_member_ips'])
trusted_devices = self._get_trusted_devices(device_ids, devices)
with self.firewall.defer_apply():
if self.use_enhanced_rpc:
LOG.debug("Update security group information for ports %s",
devices.keys())
self._update_security_group_info(
security_groups, security_group_member_ips)
LOG.debug("Update security group information for ports %s",
devices.keys())
self._update_security_group_info(
security_groups, security_group_member_ips)
for device in devices.values():
if update_filter:
LOG.debug("Update port filter for %s", device['device'])
@ -243,8 +217,7 @@ class SecurityGroupAgentRpc(object):
if sec_grp_set & set(device.get(attribute, [])):
devices.append(device['device'])
if devices:
if self.use_enhanced_rpc:
self.firewall.security_group_updated(action_type, sec_grp_set)
self.firewall.security_group_updated(action_type, sec_grp_set)
if self.defer_refresh_firewall:
LOG.debug("Adding %s devices to the list of devices "
"for which firewall needs to be refreshed",
@ -315,7 +288,7 @@ class SecurityGroupAgentRpc(object):
LOG.debug("Preparing device filters for %d new devices",
len(new_devices))
self.prepare_devices_filter(new_devices)
if self.use_enhanced_rpc and updated_devices:
if updated_devices:
self.firewall.security_group_updated('sg_member', [],
updated_devices)
# If a device is both in new and updated devices

View File

@ -894,10 +894,10 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
super(SecurityGroupAgentRpcTestCase, self).setUp(
defer_refresh_firewall)
rpc = self.agent.plugin_rpc
rpc.security_group_info_for_devices.side_effect = (
oslo_messaging.UnsupportedVersion('1.2'))
rpc.security_group_rules_for_devices.return_value = (
self.firewall.ports)
rpc.security_group_info_for_devices.return_value = {
'devices': self.firewall.ports,
'security_groups': {},
'sg_member_ips': {}}
def test_prepare_and_remove_devices_filter(self):
self.agent.prepare_devices_filter(['fake_device'])
@ -936,7 +936,6 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
devices_to_filter = {k: {'device': k} for k in range(4, 8)}
all_devices = range(10)
expected_devices = [0, 1, 2, 3, 8, 9]
self.agent._use_enhanced_rpc = True
with mock.patch.object(
self.agent.plugin_rpc,
'security_group_info_for_devices',
@ -967,7 +966,7 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
[mock.call.refresh_firewall([self.fake_device['device']])])
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_security_groups_rule_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
@ -982,7 +981,7 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
[mock.call.refresh_firewall([self.fake_device['device']])])
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_security_groups_member_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
@ -1242,7 +1241,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
def test_security_groups_rule_updated(self):
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_multiple_security_groups_rule_updated_same_port(self):
with self.add_fake_device(device='fake_device_2',
@ -1252,7 +1251,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.security_groups_rule_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_security_groups_rule_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
@ -1262,7 +1261,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
'fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_multiple_security_groups_rule_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
@ -1272,12 +1271,12 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.security_groups_rule_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_security_groups_member_updated(self):
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_multiple_security_groups_member_updated_same_port(self):
with self.add_fake_device(device='fake_device_2',
@ -1290,7 +1289,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_security_groups_member_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
@ -1299,7 +1298,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.security_groups_member_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_multiple_security_groups_member_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
@ -1309,7 +1308,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.security_groups_member_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_setup_port_filters_new_ports_only(self):
self.agent.prepare_devices_filter = mock.Mock()
@ -1331,7 +1330,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_updated_device']))
self.assertFalse(self.agent.prepare_devices_filter.called)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_setup_port_filter_new_and_updated_ports(self):
self.agent.prepare_devices_filter = mock.Mock()
@ -1344,7 +1343,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
set(['fake_new_device']))
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_updated_device']))
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_setup_port_filters_sg_updates_only(self):
self.agent.prepare_devices_filter = mock.Mock()
@ -1397,7 +1396,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device', 'fake_device_2', 'fake_updated_device']))
self.assertFalse(self.agent.prepare_devices_filter.called)
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_setup_port_filters_all_updates(self):
self.agent.prepare_devices_filter = mock.Mock()
@ -1411,7 +1410,7 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
set(['fake_new_device']))
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device', 'fake_device_2', 'fake_updated_device']))
self.assertFalse(self.firewall.security_group_updated.called)
self.assertTrue(self.firewall.security_group_updated.called)
def test_setup_port_filters_no_update(self):
self.agent.prepare_devices_filter = mock.Mock()
@ -2779,7 +2778,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
def setUp(self, defer_refresh_firewall=False):
clear_mgrs = lambda: ip_conntrack.CONTRACK_MGRS.clear()
self.addCleanup(clear_mgrs)
clear_mgrs() # clear before start in case other tests didn't clean up
@ -2793,11 +2792,6 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.rpc = mock.Mock()
self._init_agent(defer_refresh_firewall)
if test_rpc_v1_1:
self.rpc.security_group_info_for_devices.side_effect = (
oslo_messaging.UnsupportedVersion('1.2'))
self.iptables = self.agent.firewall.iptables
self.ipconntrack = self.agent.firewall.ipconntrack
# TODO(jlibosva) Get rid of mocking iptables execute and mock out
@ -2931,9 +2925,6 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.assertThat(kwargs['process_input'],
matchers.MatchesRegex(expected_regex))
self.assertEqual(exp_fw_sg_updated_call,
self.agent.firewall.security_group_updated.called)
def _replay_iptables(self, v4_filter, v6_filter, raw):
self._register_mock_call(
['iptables-save'], run_as_root=True, privsep_exec=True,
@ -2952,7 +2943,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
def test_prepare_remove_port(self):
self.ipconntrack._device_zone_map = {}
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1,
IPTABLES_RAW_BRIDGE_NET_1)
self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY,
@ -2964,7 +2957,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self._verify_mock_calls()
def test_security_group_member_updated(self):
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1,
IPTABLES_RAW_BRIDGE_NET_1)
self._replay_iptables(IPTABLES_FILTER_1_2, IPTABLES_FILTER_V6_1,
@ -2979,10 +2974,14 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
IPTABLES_RAW_DEFAULT)
self.agent.prepare_devices_filter(['tap_port1'])
self.rpc.security_group_rules_for_devices.return_value = self.devices2
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices2, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_member_updated(['security_group1'])
self.agent.prepare_devices_filter(['tap_port2'])
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_member_updated(['security_group1'])
self.agent.remove_devices_filter(['tap_port2'])
self.agent.remove_devices_filter(['tap_port1'])
@ -2990,7 +2989,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self._verify_mock_calls()
def test_security_group_rule_updated(self):
self.rpc.security_group_rules_for_devices.return_value = self.devices2
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices2, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(
IPTABLES_FILTER_2_TRUSTED, IPTABLES_FILTER_V6_2_TRUSTED,
IPTABLES_RAW_BRIDGE_NET_2)
@ -2999,7 +3000,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
IPTABLES_RAW_BRIDGE_NET_2)
self.agent.prepare_devices_filter(['tap_port1', 'tap_port3'])
self.rpc.security_group_rules_for_devices.return_value = self.devices3
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices3, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_rule_updated(['security_group1'])
self._verify_mock_calls()
@ -3009,7 +3012,7 @@ class TestSecurityGroupAgentEnhancedRpcWithIptables(
TestSecurityGroupAgentWithIptables):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)
defer_refresh_firewall=defer_refresh_firewall)
self.sg_info = self.rpc.security_group_info_for_devices
rule1 = [{'direction': 'ingress',
@ -3260,10 +3263,9 @@ class TestSecurityGroupAgentWithOVSIptables(
FIREWALL_DRIVER = FIREWALL_HYBRID_DRIVER
def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentWithOVSIptables, self).setUp(
defer_refresh_firewall,
test_rpc_v1_1)
defer_refresh_firewall)
def _init_agent(self, defer_refresh_firewall):
self.agent = sg_rpc.SecurityGroupAgentRpc(
@ -3275,7 +3277,9 @@ class TestSecurityGroupAgentWithOVSIptables(
def test_prepare_remove_port(self):
self.ipconntrack._device_zone_map = {}
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1,
IPTABLES_RAW_DEVICE_1)
self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY,
@ -3289,7 +3293,9 @@ class TestSecurityGroupAgentWithOVSIptables(
def test_prepare_remove_port_no_ct_zone(self):
self.ipconntrack.get_device_zone = mock.Mock()
self.ipconntrack.get_device_zone.side_effect = [{}, {}]
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1,
IPTABLES_RAW_DEFAULT)
self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY,
@ -3302,7 +3308,9 @@ class TestSecurityGroupAgentWithOVSIptables(
def test_security_group_member_updated(self):
self.ipconntrack._device_zone_map = {}
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1,
IPTABLES_RAW_DEVICE_1)
self._replay_iptables(IPTABLES_FILTER_1_2, IPTABLES_FILTER_V6_1,
@ -3317,10 +3325,14 @@ class TestSecurityGroupAgentWithOVSIptables(
IPTABLES_RAW_DEFAULT)
self.agent.prepare_devices_filter(['tap_port1'])
self.rpc.security_group_rules_for_devices.return_value = self.devices2
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices2, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_member_updated(['security_group1'])
self.agent.prepare_devices_filter(['tap_port2'])
self.rpc.security_group_rules_for_devices.return_value = self.devices1
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices1, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_member_updated(['security_group1'])
self.agent.remove_devices_filter(['tap_port2'])
self.agent.remove_devices_filter(['tap_port1'])
@ -3329,7 +3341,9 @@ class TestSecurityGroupAgentWithOVSIptables(
def test_security_group_rule_updated(self):
self.ipconntrack._device_zone_map = {}
self.rpc.security_group_rules_for_devices.return_value = self.devices2
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices2, 'security_groups': {},
'sg_member_ips': {}}
self._replay_iptables(
IPTABLES_FILTER_2_TRUSTED, IPTABLES_FILTER_V6_2_TRUSTED,
IPTABLES_RAW_DEVICE_2)
@ -3338,7 +3352,9 @@ class TestSecurityGroupAgentWithOVSIptables(
IPTABLES_RAW_DEVICE_2)
self.agent.prepare_devices_filter(['tap_port1', 'tap_port3'])
self.rpc.security_group_rules_for_devices.return_value = self.devices3
self.rpc.security_group_info_for_devices.return_value = {
'devices': self.devices3, 'security_groups': {},
'sg_member_ips': {}}
self.agent.security_groups_rule_updated(['security_group1'])
self._verify_mock_calls()