31e1aeb66b
neutron-sanity-check tool was importing neutron.tests.base module, which may be not present on some systems (f.e. RDO splits neutron/tests/ subtree in a separate python-neutron-tests package). It made the tool not usable in some setups. https://bugzilla.redhat.com/show_bug.cgi?id=1374282 This is not the first time when we by mistake import from neutron.tests.* and break distributions. It's time to stop it by proactively forbidding that pattern via a new hacking check. Some functions were moved from neutron.tests.base to neutron.common.utils to fulfill the need requirement. They were moved using debtcollector, no current consumers should be affected. Closes-Bug: #1621782 Change-Id: I790777ddcbd1b02218b3db54ae3d5c931d72d4fa
103 lines
3.9 KiB
Python
103 lines
3.9 KiB
Python
# Copyright (c) 2015 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron.agent.linux import ip_lib
|
|
from neutron.agent.linux import ipset_manager
|
|
from neutron.agent.linux import iptables_manager
|
|
from neutron.common import utils
|
|
from neutron.tests.common import machine_fixtures
|
|
from neutron.tests.common import net_helpers
|
|
from neutron.tests.functional.agent.linux import base
|
|
from neutron.tests.functional import base as functional_base
|
|
|
|
MAX_IPSET_NAME_LENGTH = 28
|
|
IPSET_ETHERTYPE = 'IPv4'
|
|
UNRELATED_IP = '1.1.1.1'
|
|
|
|
|
|
class IpsetBase(functional_base.BaseSudoTestCase):
|
|
|
|
def setUp(self):
|
|
super(IpsetBase, self).setUp()
|
|
|
|
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
|
|
self.source, self.destination = self.useFixture(
|
|
machine_fixtures.PeerMachines(bridge)).machines
|
|
|
|
self.ipset_name = utils.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-')
|
|
self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT'
|
|
% self.ipset_name)
|
|
self.ipset = self._create_ipset_manager_and_set(
|
|
ip_lib.IPWrapper(self.destination.namespace), self.ipset_name)
|
|
self.addCleanup(self.ipset._destroy, self.ipset_name)
|
|
self.dst_iptables = iptables_manager.IptablesManager(
|
|
namespace=self.destination.namespace)
|
|
|
|
self._add_iptables_ipset_rules()
|
|
self.addCleanup(self._remove_iptables_ipset_rules)
|
|
|
|
def _create_ipset_manager_and_set(self, dst_ns, set_name):
|
|
ipset = ipset_manager.IpsetManager(
|
|
namespace=dst_ns.namespace)
|
|
|
|
ipset._create_set(set_name, IPSET_ETHERTYPE)
|
|
return ipset
|
|
|
|
def _remove_iptables_ipset_rules(self):
|
|
self.dst_iptables.ipv4['filter'].remove_rule(
|
|
'INPUT', base.ICMP_BLOCK_RULE)
|
|
self.dst_iptables.ipv4['filter'].remove_rule(
|
|
'INPUT', self.icmp_accept_rule)
|
|
self.dst_iptables.apply()
|
|
|
|
def _add_iptables_ipset_rules(self):
|
|
self.dst_iptables.ipv4['filter'].add_rule(
|
|
'INPUT', self.icmp_accept_rule)
|
|
self.dst_iptables.ipv4['filter'].add_rule(
|
|
'INPUT', base.ICMP_BLOCK_RULE)
|
|
self.dst_iptables.apply()
|
|
|
|
|
|
class IpsetManagerTestCase(IpsetBase):
|
|
|
|
def test_add_member_allows_ping(self):
|
|
self.source.assert_no_ping(self.destination.ip)
|
|
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
|
|
self.source.assert_ping(self.destination.ip)
|
|
|
|
def test_del_member_denies_ping(self):
|
|
self.ipset._add_member_to_set(self.ipset_name, self.source.ip)
|
|
self.source.assert_ping(self.destination.ip)
|
|
|
|
self.ipset._del_member_from_set(self.ipset_name, self.source.ip)
|
|
self.source.assert_no_ping(self.destination.ip)
|
|
|
|
def test_refresh_ipset_allows_ping(self):
|
|
self.ipset._refresh_set(
|
|
self.ipset_name, [UNRELATED_IP], IPSET_ETHERTYPE)
|
|
self.source.assert_no_ping(self.destination.ip)
|
|
|
|
self.ipset._refresh_set(
|
|
self.ipset_name, [UNRELATED_IP, self.source.ip], IPSET_ETHERTYPE)
|
|
self.source.assert_ping(self.destination.ip)
|
|
|
|
self.ipset._refresh_set(
|
|
self.ipset_name, [self.source.ip, UNRELATED_IP], IPSET_ETHERTYPE)
|
|
self.source.assert_ping(self.destination.ip)
|
|
|
|
def test_destroy_ipset_set(self):
|
|
self._remove_iptables_ipset_rules()
|
|
self.ipset._destroy(self.ipset_name)
|