Refactor security group rpc call

Refactor rpc call of 'security_group_rules_for_devices' into
'security_group_info_for_devices' to reduce the response
message size and server side processing time. Includes
agent-side auto detection of rpc version, and fall back
to the old one.

Change-Id: If19be8579ca734a899cdd673c919eee8165aaa0e
Implements: blueprint security-group-rules-for-devices-rpc-call-refactor
DocImpact
Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>
This commit is contained in:
shihanzhang 2014-08-04 16:44:31 +08:00 committed by Miguel Angel Ajo
parent 2b3c20591d
commit 4253887a86
5 changed files with 603 additions and 67 deletions

View File

@ -31,6 +31,8 @@ SPOOF_FILTER = 'spoof-filter'
CHAIN_NAME_PREFIX = {INGRESS_DIRECTION: 'i',
EGRESS_DIRECTION: 'o',
SPOOF_FILTER: 's'}
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}
LINUX_DEV_LEN = 14
@ -48,11 +50,25 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
self._add_fallback_chain_v4v6()
self._defer_apply = False
self._pre_defer_filtered_ports = None
# List of security group rules for ports residing on this host
self.sg_rules = {}
self.pre_sg_rules = None
# List of security group member ips for ports residing on this host
self.sg_members = {}
self.pre_sg_members = None
@property
def ports(self):
return self.filtered_ports
def update_security_group_rules(self, sg_id, sg_rules):
LOG.debug("Update rules of security group (%s)", sg_id)
self.sg_rules[sg_id] = sg_rules
def update_security_group_members(self, sg_id, sg_members):
LOG.debug("Update members of security group (%s)", sg_id)
self.sg_members[sg_id] = sg_members
def prepare_port_filter(self, port):
LOG.debug(_("Preparing device (%s) filter"), port['device'])
self._remove_chains()
@ -250,10 +266,33 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
icmp6_type]
return icmpv6_rules
def _select_sg_rules_for_port(self, port, direction):
sg_ids = port.get('security_groups', [])
port_rules = []
fixed_ips = port.get('fixed_ips', [])
for sg_id in sg_ids:
for rule in self.sg_rules.get(sg_id, []):
if rule['direction'] == direction:
remote_group_id = rule.get('remote_group_id')
if not remote_group_id:
port_rules.append(rule)
continue
ethertype = rule['ethertype']
for ip in self.sg_members[remote_group_id][ethertype]:
if ip in fixed_ips:
continue
ip_rule = rule.copy()
direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
ip_rule[direction_ip_prefix] = str(
netaddr.IPNetwork(ip).cidr)
port_rules.append(ip_rule)
return port_rules
def _add_rule_by_security_group(self, port, direction):
chain_name = self._port_chain_name(port, direction)
# select rules for current direction
security_group_rules = self._select_sgr_by_direction(port, direction)
security_group_rules += self._select_sg_rules_for_port(port, direction)
# split groups by ip version
# for ipv4, iptables command is used
# for ipv6, iptables6 command is used
@ -360,15 +399,43 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
if not self._defer_apply:
self.iptables.defer_apply_on()
self._pre_defer_filtered_ports = dict(self.filtered_ports)
self.pre_sg_members = dict(self.sg_members)
self.pre_sg_rules = dict(self.sg_rules)
self._defer_apply = True
def _remove_unused_security_group_info(self):
need_removed_ipset_chains = set()
need_removed_security_groups = set()
remote_group_ids = set()
cur_group_ids = set()
for port in self.filtered_ports.values():
source_groups = port.get('security_group_source_groups', [])
remote_group_ids.update(source_groups)
groups = port.get('security_groups', [])
cur_group_ids.update(groups)
need_removed_ipset_chains.update(
[x for x in self.pre_sg_members if x not in remote_group_ids])
need_removed_security_groups.update(
[x for x in self.pre_sg_rules if x not in cur_group_ids])
# Remove unused remote security group member ips
for remove_chain_id in need_removed_ipset_chains:
if remove_chain_id in self.sg_members:
self.sg_members.pop(remove_chain_id, None)
# Remove unused security group rules
for remove_group_id in need_removed_security_groups:
if remove_group_id in self.sg_rules:
self.sg_rules.pop(remove_group_id, None)
def filter_defer_apply_off(self):
if self._defer_apply:
self._defer_apply = False
self._remove_chains_apply(self._pre_defer_filtered_ports)
self._pre_defer_filtered_ports = None
self._setup_chains_apply(self.filtered_ports)
self.iptables.defer_apply_off()
self._remove_unused_security_group_info()
self._pre_defer_filtered_ports = None
class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):

View File

@ -15,12 +15,16 @@
#
from oslo.config import cfg
from oslo import messaging
from neutron.common import topics
from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# history
# 1.1 Support Security Group RPC
SG_RPC_VERSION = "1.1"
security_group_opts = [
@ -74,13 +78,22 @@ def disable_security_group_extension_by_config(aliases):
class SecurityGroupServerRpcApiMixin(object):
"""A mix-in that enable SecurityGroup support in plugin rpc."""
def security_group_rules_for_devices(self, context, devices):
LOG.debug(_("Get security group rules "
"for devices via rpc %r"), devices)
return self.call(context,
self.make_msg('security_group_rules_for_devices',
devices=devices),
version=SG_RPC_VERSION)
version='1.1')
def security_group_info_for_devices(self, context, devices):
LOG.debug("Get security group information for devices via rpc %r",
devices)
return self.call(context,
self.make_msg('security_group_info_for_devices',
devices=devices),
version='1.2')
class SecurityGroupAgentRpcCallbackMixin(object):
@ -149,16 +162,58 @@ class SecurityGroupAgentRpcMixin(object):
self.devices_to_refilter = set()
# Flag raised when a global refresh is needed
self.global_refresh_firewall = False
self._use_enhanced_rpc = None
@property
def use_enhanced_rpc(self):
if self._use_enhanced_rpc is None:
self._use_enhanced_rpc = (
self._check_enhanced_rpc_is_supported_by_server())
return self._use_enhanced_rpc
def _check_enhanced_rpc_is_supported_by_server(self):
try:
self.plugin_rpc.security_group_info_for_devices(
self.context, devices=[])
except messaging.UnsupportedVersion:
LOG.warning(_LW('security_group_info_for_devices rpc call not '
'supported by the server, falling back to old '
'security_group_rules_for_devices which scales '
'worse.'))
return False
return True
def prepare_devices_filter(self, device_ids):
if not device_ids:
return
LOG.info(_("Preparing filters for devices %s"), device_ids)
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, list(device_ids))
if self.use_enhanced_rpc:
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.context, list(device_ids))
devices = devices_info['devices']
security_groups = devices_info['security_groups']
security_group_member_ips = devices_info['sg_member_ips']
else:
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, list(device_ids))
with self.firewall.defer_apply():
for device in devices.values():
self.firewall.prepare_port_filter(device)
if self.use_enhanced_rpc:
LOG.debug("Update security group information for ports %s",
devices.keys())
self._update_security_group_info(
security_groups, security_group_member_ips)
def _update_security_group_info(self, security_groups,
security_group_member_ips):
LOG.debug("Update security group information")
for sg_id, sg_rules in security_groups.items():
self.firewall.update_security_group_rules(sg_id, sg_rules)
for remote_sg_id, member_ips in security_group_member_ips.items():
self.firewall.update_security_group_members(
remote_sg_id, member_ips)
def security_groups_rule_updated(self, security_groups):
LOG.info(_("Security group "
@ -217,12 +272,25 @@ class SecurityGroupAgentRpcMixin(object):
if not device_ids:
LOG.info(_("No ports here to refresh firewall"))
return
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, device_ids)
if self.use_enhanced_rpc:
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.context, device_ids)
devices = devices_info['devices']
security_groups = devices_info['security_groups']
security_group_member_ips = devices_info['sg_member_ips']
else:
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, device_ids)
with self.firewall.defer_apply():
for device in devices.values():
LOG.debug(_("Update port filter for %s"), device['device'])
self.firewall.update_port_filter(device)
if self.use_enhanced_rpc:
LOG.debug("Update security group information for ports %s",
devices.keys())
self._update_security_group_info(
security_groups, security_group_member_ips)
def firewall_refresh_needed(self):
return self.global_refresh_firewall or self.devices_to_refilter

View File

@ -28,15 +28,27 @@ class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
# API version history:
# 1.1 - Initial version
# 1.2 - security_group_info_for_devices introduced as an optimization
# NOTE: RPC_API_VERSION must not be overridden in subclasses
# to keep RPC API version consistent across plugins.
RPC_API_VERSION = '1.1'
RPC_API_VERSION = '1.2'
@property
def plugin(self):
return manager.NeutronManager.get_plugin()
def _get_devices_info(self, devices):
devices_info = {}
for device in devices:
port = self.plugin.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
devices_info[port['id']] = port
return devices_info
def security_group_rules_for_devices(self, context, **kwargs):
"""Callback method to return security group rules for each port.
@ -46,14 +58,21 @@ class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')
ports = {}
for device in devices:
port = self.plugin.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
devices_info = kwargs.get('devices')
ports = self._get_devices_info(devices_info)
return self.plugin.security_group_rules_for_ports(context, ports)
def security_group_info_for_devices(self, context, **kwargs):
"""Return security group information for requested devices.
:params devices: list of devices
:returns:
sg_info{
'security_groups': {sg_id: [rule1, rule2]}
'sg_member_ips': {sg_id: {'IPv4': [], 'IPv6': []}}
'devices': {device_id: {device_info}}
}
"""
devices_info = kwargs.get('devices')
ports = self._get_devices_info(devices_info)
return self.plugin.security_group_info_for_ports(context, ports)

View File

@ -149,6 +149,69 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
def security_group_info_for_ports(self, context, ports):
sg_info = {'devices': ports,
'security_groups': {},
'sg_member_ips': {}}
rules_in_db = self._select_rules_for_ports(context, ports)
remote_security_group_info = {}
for (binding, rule_in_db) in rules_in_db:
port_id = binding['port_id']
remote_gid = rule_in_db.get('remote_group_id')
security_group_id = rule_in_db.get('security_group_id')
ethertype = rule_in_db['ethertype']
if ('security_group_source_groups'
not in sg_info['devices'][port_id]):
sg_info['devices'][port_id][
'security_group_source_groups'] = []
if remote_gid:
if (remote_gid
not in sg_info['devices'][port_id][
'security_group_source_groups']):
sg_info['devices'][port_id][
'security_group_source_groups'].append(remote_gid)
if remote_gid not in remote_security_group_info:
remote_security_group_info[remote_gid] = {}
if ethertype not in remote_security_group_info[remote_gid]:
remote_security_group_info[remote_gid][ethertype] = []
direction = rule_in_db['direction']
rule_dict = {
'direction': direction,
'ethertype': ethertype}
for key in ('protocol', 'port_range_min', 'port_range_max',
'remote_ip_prefix', 'remote_group_id'):
if rule_in_db.get(key):
if key == 'remote_ip_prefix':
direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
rule_dict[direction_ip_prefix] = rule_in_db[key]
continue
rule_dict[key] = rule_in_db[key]
if security_group_id not in sg_info['security_groups']:
sg_info['security_groups'][security_group_id] = []
if rule_dict not in sg_info['security_groups'][security_group_id]:
sg_info['security_groups'][security_group_id].append(
rule_dict)
sg_info['sg_member_ips'] = remote_security_group_info
# the provider rules do not belong to any security group, so these
# rules still reside in sg_info['devices'] [port_id]
self._apply_provider_rule(context, sg_info['devices'])
return self._get_security_group_member_ips(context, sg_info)
def _get_security_group_member_ips(self, context, sg_info):
ips = self._select_ips_for_remote_group(
context, sg_info['sg_member_ips'].keys())
for sg_id, member_ips in ips.items():
for ip in member_ips:
ethertype = 'IPv%d' % netaddr.IPAddress(ip).version
if ip not in sg_info['sg_member_ips'][sg_id][ethertype]:
sg_info['sg_member_ips'][sg_id][ethertype].append(ip)
return sg_info
def _select_rules_for_ports(self, context, ports):
if not ports:
return []

View File

@ -17,6 +17,7 @@ import contextlib
import mock
from oslo.config import cfg
from oslo import messaging
from testtools import matchers
import webob.exc
@ -396,6 +397,64 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1)
self._delete('ports', port_id2)
def test_security_group_info_for_devices_ipv4_source_group(self):
with self.network() as n:
with contextlib.nested(self.subnet(n),
self.security_group(),
self.security_group()) as (subnet_v4,
sg1,
sg2):
sg1_id = sg1['security_group']['id']
sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', const.PROTO_NAME_TCP, '24',
'25', remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
res1 = self._create_port(
self.fmt, n['network']['id'],
security_groups=[sg1_id])
ports_rest1 = self.deserialize(self.fmt, res1)
port_id1 = ports_rest1['port']['id']
self.rpc.devices = {port_id1: ports_rest1['port']}
devices = [port_id1, 'no_exist_device']
res2 = self._create_port(
self.fmt, n['network']['id'],
security_groups=[sg2_id])
ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_info_for_devices(
ctx, devices=devices)
expected = {
'security_groups': {sg1_id: [
{'direction': 'egress', 'ethertype': const.IPv4},
{'direction': 'egress', 'ethertype': const.IPv6},
{'direction': u'ingress',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id}
]},
'sg_member_ips': {sg2_id: {
'IPv4': [u'10.0.0.3'],
'IPv6': [],
}}
}
self.assertEqual(expected['security_groups'],
ports_rpc['security_groups'])
self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'],
ports_rpc['sg_member_ips'][sg2_id]['IPv4'])
self._delete('ports', port_id1)
self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self):
fake_prefix = FAKE_PREFIX[const.IPv6]
fake_gateway = FAKE_IP[const.IPv6]
@ -877,6 +936,8 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
'enable_security_group', False,
group='SECURITYGROUP')
agent = sg_rpc.SecurityGroupAgentRpcMixin()
agent.plugin_rpc = mock.Mock()
agent.context = None
agent.init_firewall()
self.assertEqual(agent.firewall.__class__.__name__,
'NoopFirewallDriver')
@ -892,13 +953,16 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
rpc = mock.Mock()
rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
self.agent.plugin_rpc = rpc
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
self.agent.firewall = self.firewall
rpc = mock.Mock()
self.agent.plugin_rpc = rpc
self.fake_device = {'device': 'fake_device',
'security_groups': ['fake_sgid1', 'fake_sgid2'],
'security_group_source_groups': ['fake_sgid2'],
@ -977,6 +1041,141 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.firewall.assert_has_calls([])
class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
rpc = mock.Mock()
self.agent.plugin_rpc = rpc
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
self.agent.firewall = self.firewall
self.fake_device = {'device': 'fake_device',
'security_groups': ['fake_sgid1', 'fake_sgid2'],
'security_group_source_groups': ['fake_sgid2'],
'security_group_rules': [{'security_group_id':
'fake_sgid1',
'remote_group_id':
'fake_sgid2'}]}
fake_devices = {'fake_device': self.fake_device}
fake_sg_info = {
'security_groups': {
'fake_sgid1': [
{'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
'devices': fake_devices}
self.firewall.ports = fake_devices
rpc.security_group_info_for_devices.return_value = fake_sg_info
def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_device'])
self.agent.remove_devices_filter(['fake_device'])
# these two mock are too log, just use tmp_mock to replace them
tmp_mock1 = mock.call.update_security_group_rules(
'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}])
tmp_mock2 = mock.call.update_security_group_members(
'fake_sgid2', {'IPv4': [], 'IPv6': []})
# ignore device which is not filtered
self.firewall.assert_has_calls([mock.call.defer_apply(),
mock.call.prepare_port_filter(
self.fake_device),
mock.call.update_security_group_rules(
'fake_sgid2', []),
tmp_mock1,
tmp_mock2,
mock.call.defer_apply(),
mock.call.remove_port_filter(
self.fake_device),
])
def test_security_groups_rule_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.agent.refresh_firewall.assert_called_once_with(
[self.fake_device['device']])
def test_security_groups_rule_not_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
def test_security_groups_member_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_member_updated(
['fake_sgid2', 'fake_sgid3'])
self.agent.refresh_firewall.assert_called_once_with(
[self.fake_device['device']])
def test_security_groups_member_not_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_member_updated(
['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
def test_security_groups_provider_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.security_groups_provider_updated()
self.agent.refresh_firewall.assert_has_calls(
[mock.call.refresh_firewall()])
def test_refresh_firewall_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.refresh_firewall()
calls = [mock.call.defer_apply(),
mock.call.prepare_port_filter(self.fake_device),
mock.call.update_security_group_rules('fake_sgid2', []),
mock.call.update_security_group_rules(
'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}]),
mock.call.update_security_group_members(
'fake_sgid2', {'IPv4': [], 'IPv6': []}),
mock.call.defer_apply(),
mock.call.update_port_filter(self.fake_device),
mock.call.update_security_group_rules('fake_sgid2', []),
mock.call.update_security_group_rules(
'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}]),
mock.call.update_security_group_members(
'fake_sgid2', {'IPv4': [], 'IPv6': []})]
self.firewall.assert_has_calls(calls)
def test_refresh_firewall_devices_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_device'])
self.agent.refresh_firewall([self.fake_device])
calls = [mock.call.defer_apply(),
mock.call.prepare_port_filter(self.fake_device),
mock.call.update_security_group_rules('fake_sgid2', []),
mock.call.update_security_group_rules('fake_sgid1', [
{'remote_group_id': 'fake_sgid2'}]),
mock.call.update_security_group_members('fake_sgid2', {
'IPv4': [], 'IPv6': []
}),
mock.call.defer_apply(),
mock.call.update_port_filter(self.fake_device),
mock.call.update_security_group_rules('fake_sgid2', []),
mock.call.update_security_group_rules('fake_sgid1', [
{'remote_group_id': 'fake_sgid2'}]),
mock.call.update_security_group_members('fake_sgid2', {
'IPv4': [], 'IPv6': []})
]
self.firewall.assert_has_calls(calls)
def test_refresh_firewall_none_enhanced_rpc(self):
self.agent.refresh_firewall([])
self.firewall.assert_has_calls([])
class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
SecurityGroupAgentRpcTestCase):
@ -1198,7 +1397,7 @@ class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
{'devices': ['fake_device']},
'method': 'security_group_rules_for_devices',
'namespace': None},
version=sg_rpc.SG_RPC_VERSION)])
version='1.1')])
class FakeSGNotifierAPI(n_rpc.RpcProxy,
@ -1309,8 +1508,8 @@ IPTABLES_FILTER_1 = """# Generated by iptables_manager
%(physdev_is_bridged)s -j %(bn)s-i_port1
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@ -1319,8 +1518,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
RETURN
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
-j RETURN
[0:0] -A %(bn)s-s_port1 -j DROP
[0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@ -1360,10 +1559,10 @@ IPTABLES_FILTER_1_2 = """# Generated by iptables_manager
%(physdev_is_bridged)s -j %(bn)s-i_port1
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
[0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-sg-chain
@ -1371,8 +1570,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
RETURN
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
-j RETURN
[0:0] -A %(bn)s-s_port1 -j DROP
[0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@ -1416,10 +1615,10 @@ IPTABLES_FILTER_2 = """# Generated by iptables_manager
%(physdev_is_bridged)s -j %(bn)s-i_port1
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
[0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-sg-chain
@ -1427,7 +1626,7 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 \
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
-j RETURN
[0:0] -A %(bn)s-s_port1 -j DROP
[0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
@ -1443,10 +1642,10 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-i_port2
[0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
[0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
%(physdev_is_bridged)s -j %(bn)s-sg-chain
@ -1454,7 +1653,7 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 \
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
-j RETURN
[0:0] -A %(bn)s-s_port2 -j DROP
[0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
@ -1497,8 +1696,8 @@ IPTABLES_FILTER_2_2 = """# Generated by iptables_manager
%(physdev_is_bridged)s -j %(bn)s-i_port1
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@ -1507,8 +1706,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
RETURN
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
-j RETURN
[0:0] -A %(bn)s-s_port1 -j DROP
[0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@ -1523,10 +1722,10 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-i_port2
[0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
[0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
%(physdev_is_bridged)s -j %(bn)s-sg-chain
@ -1534,8 +1733,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 -j \
RETURN
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
-j RETURN
[0:0] -A %(bn)s-s_port2 -j DROP
[0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port2 -j %(bn)s-s_port2
@ -1577,10 +1776,10 @@ IPTABLES_FILTER_2_3 = """# Generated by iptables_manager
%(physdev_is_bridged)s -j %(bn)s-i_port1
[0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
[0:0] -A %(bn)s-i_port1 -p icmp -j RETURN
[0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@ -1589,8 +1788,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
%(physdev_is_bridged)s -j %(bn)s-o_port1
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
RETURN
[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
-j RETURN
[0:0] -A %(bn)s-s_port1 -j DROP
[0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@ -1605,10 +1804,10 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-i_port2
[0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
[0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
-j RETURN
[0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
[0:0] -A %(bn)s-i_port2 -p icmp -j RETURN
[0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
[0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
@ -1617,8 +1816,8 @@ RETURN
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
%(physdev_is_bridged)s -j %(bn)s-o_port2
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 -j \
RETURN
[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
-j RETURN
[0:0] -A %(bn)s-s_port2 -j DROP
[0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
[0:0] -A %(bn)s-o_port2 -j %(bn)s-s_port2
@ -1832,7 +2031,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.root_helper = 'sudo'
self.agent.root_helper = 'sudo'
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
self.rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
self.agent.init_firewall(
defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
# TODO(jlibosva) Get rid of mocking iptables execute and mock out
@ -1846,12 +2051,10 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.expected_process_inputs = []
self.iptables_execute.side_effect = self.iptables_execute_return_values
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
rule1 = [{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv4,
'source_ip_prefix': '10.0.0.2',
'source_ip_prefix': '10.0.0.2/32',
'source_port_range_min': 67,
'source_port_range_max': 67,
'port_range_min': 68,
@ -1865,7 +2068,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
'ethertype': const.IPv4}]
rule2 = rule1[:]
rule2 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.4',
'source_ip_prefix': '10.0.0.4/32',
'ethertype': const.IPv4}]
rule3 = rule2[:]
rule3 += [{'direction': 'ingress',
@ -1873,30 +2076,31 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
'ethertype': const.IPv4}]
rule4 = rule1[:]
rule4 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.3',
'source_ip_prefix': '10.0.0.3/32',
'ethertype': const.IPv4}]
rule5 = rule4[:]
rule5 += [{'direction': 'ingress',
'protocol': const.PROTO_NAME_ICMP,
'ethertype': const.IPv4}]
self.devices1 = {'tap_port1': self._device('tap_port1',
'10.0.0.3',
'10.0.0.3/32',
'12:34:56:78:9a:bc',
rule1)}
self.devices2 = {'tap_port1': self._device('tap_port1',
'10.0.0.3',
'10.0.0.3/32',
'12:34:56:78:9a:bc',
rule2),
'tap_port2': self._device('tap_port2',
'10.0.0.4',
'10.0.0.4/32',
'12:34:56:78:9a:bd',
rule4)}
self.devices3 = {'tap_port1': self._device('tap_port1',
'10.0.0.3',
'10.0.0.3/32',
'12:34:56:78:9a:bc',
rule3),
'tap_port2': self._device('tap_port2',
'10.0.0.4',
'10.0.0.4/32',
'12:34:56:78:9a:bd',
rule5)}
@ -2008,6 +2212,121 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self._verify_mock_calls()
class TestSecurityGroupAgentEnhancedRpcWithIptables(
TestSecurityGroupAgentWithIptables):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
defer_refresh_firewall)
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
self.sg_info = self.rpc.security_group_info_for_devices
self.agent.init_firewall(
defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
self.iptables_execute = mock.patch.object(self.iptables,
"execute").start()
self.iptables_execute_return_values = []
self.expected_call_count = 0
self.expected_calls = []
self.expected_process_inputs = []
self.iptables_execute.side_effect = (
self.iptables_execute_return_values)
rule1 = [{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv4,
'source_ip_prefix': '10.0.0.2/32',
'source_port_range_min': 67,
'source_port_range_max': 67,
'port_range_min': 68,
'port_range_max': 68},
{'direction': 'ingress',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_min': 22,
'port_range_max': 22},
{'direction': 'egress',
'ethertype': const.IPv4},
{'direction': 'ingress',
'remote_group_id': 'security_group1',
'ethertype': const.IPv4}]
rule2 = rule1[:]
rule2 += [{'direction': 'ingress',
'protocol': const.PROTO_NAME_ICMP,
'ethertype': const.IPv4}]
devices_info1 = {'tap_port1': self._device('tap_port1',
'10.0.0.3/32',
'12:34:56:78:9a:bc',
[])}
self.devices_info1 = {'security_groups': {'security_group1': rule1},
'sg_member_ips': {
'security_group1': {
'IPv4': ['10.0.0.3/32'], 'IPv6': []}},
'devices': devices_info1}
devices_info2 = {'tap_port1': self._device('tap_port1',
'10.0.0.3/32',
'12:34:56:78:9a:bc',
[]),
'tap_port2': self._device('tap_port2',
'10.0.0.4/32',
'12:34:56:78:9a:bd',
[])}
self.devices_info2 = {'security_groups': {'security_group1': rule1},
'sg_member_ips': {
'security_group1': {
'IPv4': ['10.0.0.3/32', '10.0.0.4/32'],
'IPv6': []}},
'devices': devices_info2}
self.devices_info3 = {'security_groups': {'security_group1': rule2},
'sg_member_ips': {
'security_group1': {
'IPv4': ['10.0.0.3/32', '10.0.0.4/32'],
'IPv6': []}},
'devices': devices_info2}
def test_prepare_remove_port(self):
self.sg_info.return_value = self.devices_info1
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY)
self.agent.prepare_devices_filter(['tap_port1'])
self.agent.remove_devices_filter(['tap_port1'])
self._verify_mock_calls()
def test_security_group_member_updated(self):
self.sg_info.return_value = self.devices_info1
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
self._replay_iptables(IPTABLES_FILTER_1_2, IPTABLES_FILTER_V6_1)
self._replay_iptables(IPTABLES_FILTER_2, IPTABLES_FILTER_V6_2)
self._replay_iptables(IPTABLES_FILTER_2_2, IPTABLES_FILTER_V6_2)
self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY)
self.agent.prepare_devices_filter(['tap_port1'])
self.sg_info.return_value = self.devices_info2
self.agent.security_groups_member_updated(['security_group1'])
self.agent.prepare_devices_filter(['tap_port2'])
self.sg_info.return_value = self.devices_info1
self.agent.security_groups_member_updated(['security_group1'])
self.agent.remove_devices_filter(['tap_port2'])
self.agent.remove_devices_filter(['tap_port1'])
self._verify_mock_calls()
def test_security_group_rule_updated(self):
self.sg_info.return_value = self.devices_info2
self._replay_iptables(IPTABLES_FILTER_2, IPTABLES_FILTER_V6_2)
self._replay_iptables(IPTABLES_FILTER_2_3, IPTABLES_FILTER_V6_2)
self.agent.prepare_devices_filter(['tap_port1', 'tap_port3'])
self.sg_info.return_value = self.devices_info3
self.agent.security_groups_rule_updated(['security_group1'])
self._verify_mock_calls()
class SGNotificationTestMixin():
def test_security_group_rule_updated(self):
name = 'webservers'