Modify ipset functional tests to pass on older machines

Production code uses ipset exclusively in the root namespace,
however functional testing uses ipset in namespace for isolation.
This poses an issue as ipset is not supported in namespaces on
all kernels and distributions (I'm looking at you CentOS/RHEL 7.1).

This patch changes the ipset functional tests to work in the root
namespace while taking care of cleanups.

Change-Id: I08b2f59197ed76e59b2e58b5a10820653e857cda
Closes-Bug: #1460220
This commit is contained in:
Assaf Muller 2015-05-29 19:17:34 -04:00
parent a225e28701
commit 96e2314c8c
1 changed files with 33 additions and 26 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Red Hat, Inc.
# Copyright (c) 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -21,9 +21,8 @@ from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional import base as functional_base
IPSET_SET = 'test-set'
MAX_IPSET_NAME_LENGTH = 28
IPSET_ETHERTYPE = 'IPv4'
ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET
UNRELATED_IP = '1.1.1.1'
@ -36,13 +35,17 @@ class IpsetBase(functional_base.BaseSudoTestCase):
self.source, self.destination = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines
self.ipset_name = base.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT'
% self.ipset_name)
self.ipset = self._create_ipset_manager_and_set(
ip_lib.IPWrapper(self.destination.namespace), IPSET_SET)
ip_lib.IPWrapper(self.destination.namespace), self.ipset_name)
self.addCleanup(self.ipset._destroy, self.ipset_name)
self.dst_iptables = iptables_manager.IptablesManager(
namespace=self.destination.namespace)
self._add_iptables_ipset_rules(self.dst_iptables)
self._add_iptables_ipset_rules()
self.addCleanup(self._remove_iptables_ipset_rules)
def _create_ipset_manager_and_set(self, dst_ns, set_name):
ipset = ipset_manager.IpsetManager(
@ -51,45 +54,49 @@ class IpsetBase(functional_base.BaseSudoTestCase):
ipset._create_set(set_name, IPSET_ETHERTYPE)
return ipset
@staticmethod
def _remove_iptables_ipset_rules(iptables_manager):
iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
iptables_manager.apply()
def _remove_iptables_ipset_rules(self):
self.dst_iptables.ipv4['filter'].remove_rule(
'INPUT', base.ICMP_BLOCK_RULE)
self.dst_iptables.ipv4['filter'].remove_rule(
'INPUT', self.icmp_accept_rule)
self.dst_iptables.apply()
@staticmethod
def _add_iptables_ipset_rules(iptables_manager):
iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
iptables_manager.apply()
def _add_iptables_ipset_rules(self):
self.dst_iptables.ipv4['filter'].add_rule(
'INPUT', self.icmp_accept_rule)
self.dst_iptables.ipv4['filter'].add_rule(
'INPUT', base.ICMP_BLOCK_RULE)
self.dst_iptables.apply()
class IpsetManagerTestCase(IpsetBase):
def test_add_member_allows_ping(self):
self.source.assert_no_ping(self.destination.ip)
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
self.source.assert_ping(self.destination.ip)
def test_del_member_denies_ping(self):
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
self.source.assert_ping(self.destination.ip)
self.ipset._del_member_from_set(IPSET_SET, self.source.ip)
self.ipset._del_member_from_set(self.ipset_name, self.source.ip)
self.source.assert_no_ping(self.destination.ip)
def test_refresh_ipset_allows_ping(self):
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
self.ipset._refresh_set(
self.ipset_name, [UNRELATED_IP], IPSET_ETHERTYPE)
self.source.assert_no_ping(self.destination.ip)
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip],
IPSET_ETHERTYPE)
self.ipset._refresh_set(
self.ipset_name, [UNRELATED_IP, self.source.ip], IPSET_ETHERTYPE)
self.source.assert_ping(self.destination.ip)
self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP],
IPSET_ETHERTYPE)
self.ipset._refresh_set(
self.ipset_name, [self.source.ip, UNRELATED_IP], IPSET_ETHERTYPE)
self.source.assert_ping(self.destination.ip)
def test_destroy_ipset_set(self):
self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)
self._remove_iptables_ipset_rules(self.dst_iptables)
self.ipset._destroy(IPSET_SET)
self.assertRaises(RuntimeError, self.ipset._destroy, self.ipset_name)
self._remove_iptables_ipset_rules()
self.ipset._destroy(self.ipset_name)