Merge "Modify ipset functional tests to pass on older machines"
This commit is contained in:
commit
26747e5cbf
|
@ -1,4 +1,4 @@
|
|||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -21,9 +21,8 @@ from neutron.tests.common import net_helpers
|
|||
from neutron.tests.functional.agent.linux import base
|
||||
from neutron.tests.functional import base as functional_base
|
||||
|
||||
IPSET_SET = 'test-set'
|
||||
MAX_IPSET_NAME_LENGTH = 28
|
||||
IPSET_ETHERTYPE = 'IPv4'
|
||||
ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET
|
||||
UNRELATED_IP = '1.1.1.1'
|
||||
|
||||
|
||||
|
@ -36,13 +35,17 @@ class IpsetBase(functional_base.BaseSudoTestCase):
|
|||
self.source, self.destination = self.useFixture(
|
||||
machine_fixtures.PeerMachines(bridge)).machines
|
||||
|
||||
self.ipset_name = base.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
|
||||
self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT'
|
||||
% self.ipset_name)
|
||||
self.ipset = self._create_ipset_manager_and_set(
|
||||
ip_lib.IPWrapper(self.destination.namespace), IPSET_SET)
|
||||
|
||||
ip_lib.IPWrapper(self.destination.namespace), self.ipset_name)
|
||||
self.addCleanup(self.ipset._destroy, self.ipset_name)
|
||||
self.dst_iptables = iptables_manager.IptablesManager(
|
||||
namespace=self.destination.namespace)
|
||||
|
||||
self._add_iptables_ipset_rules(self.dst_iptables)
|
||||
self._add_iptables_ipset_rules()
|
||||
self.addCleanup(self._remove_iptables_ipset_rules)
|
||||
|
||||
def _create_ipset_manager_and_set(self, dst_ns, set_name):
|
||||
ipset = ipset_manager.IpsetManager(
|
||||
|
@ -51,45 +54,49 @@ class IpsetBase(functional_base.BaseSudoTestCase):
|
|||
ipset._create_set(set_name, IPSET_ETHERTYPE)
|
||||
return ipset
|
||||
|
||||
@staticmethod
|
||||
def _remove_iptables_ipset_rules(iptables_manager):
|
||||
iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE)
|
||||
iptables_manager.apply()
|
||||
def _remove_iptables_ipset_rules(self):
|
||||
self.dst_iptables.ipv4['filter'].remove_rule(
|
||||
'INPUT', base.ICMP_BLOCK_RULE)
|
||||
self.dst_iptables.ipv4['filter'].remove_rule(
|
||||
'INPUT', self.icmp_accept_rule)
|
||||
self.dst_iptables.apply()
|
||||
|
||||
@staticmethod
|
||||
def _add_iptables_ipset_rules(iptables_manager):
|
||||
iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE)
|
||||
iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
|
||||
iptables_manager.apply()
|
||||
def _add_iptables_ipset_rules(self):
|
||||
self.dst_iptables.ipv4['filter'].add_rule(
|
||||
'INPUT', self.icmp_accept_rule)
|
||||
self.dst_iptables.ipv4['filter'].add_rule(
|
||||
'INPUT', base.ICMP_BLOCK_RULE)
|
||||
self.dst_iptables.apply()
|
||||
|
||||
|
||||
class IpsetManagerTestCase(IpsetBase):
|
||||
|
||||
def test_add_member_allows_ping(self):
|
||||
self.source.assert_no_ping(self.destination.ip)
|
||||
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
|
||||
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
|
||||
self.source.assert_ping(self.destination.ip)
|
||||
|
||||
def test_del_member_denies_ping(self):
|
||||
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
|
||||
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
|
||||
self.source.assert_ping(self.destination.ip)
|
||||
|
||||
self.ipset._del_member_from_set(IPSET_SET, self.source.ip)
|
||||
self.ipset._del_member_from_set(self.ipset_name, self.source.ip)
|
||||
self.source.assert_no_ping(self.destination.ip)
|
||||
|
||||
def test_refresh_ipset_allows_ping(self):
|
||||
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
|
||||
self.ipset._refresh_set(
|
||||
self.ipset_name, [UNRELATED_IP], IPSET_ETHERTYPE)
|
||||
self.source.assert_no_ping(self.destination.ip)
|
||||
|
||||
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip],
|
||||
IPSET_ETHERTYPE)
|
||||
self.ipset._refresh_set(
|
||||
self.ipset_name, [UNRELATED_IP, self.source.ip], IPSET_ETHERTYPE)
|
||||
self.source.assert_ping(self.destination.ip)
|
||||
|
||||
self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP],
|
||||
IPSET_ETHERTYPE)
|
||||
self.ipset._refresh_set(
|
||||
self.ipset_name, [self.source.ip, UNRELATED_IP], IPSET_ETHERTYPE)
|
||||
self.source.assert_ping(self.destination.ip)
|
||||
|
||||
def test_destroy_ipset_set(self):
|
||||
self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)
|
||||
self._remove_iptables_ipset_rules(self.dst_iptables)
|
||||
self.ipset._destroy(IPSET_SET)
|
||||
self.assertRaises(RuntimeError, self.ipset._destroy, self.ipset_name)
|
||||
self._remove_iptables_ipset_rules()
|
||||
self.ipset._destroy(self.ipset_name)
|
||||
|
|
Loading…
Reference in New Issue