From 8a4540acac511cacb0d4f5680ce285e913f7ff50 Mon Sep 17 00:00:00 2001 From: Cedric Brandily Date: Sun, 1 Mar 2015 23:05:36 +0000 Subject: [PATCH] Replace BaseIPVethTestCase by FakeMachine This change removes BaseIPVethTestCase class and moves Pinger class to allow its use from a fake machine. Change-Id: I0636f11a327e9535828e7b52e60195e52831a0b2 --- neutron/tests/common/machine_fixtures.py | 38 +++++++++++++- neutron/tests/functional/agent/linux/base.py | 13 ----- .../tests/functional/agent/linux/helpers.py | 24 --------- .../agent/linux/test_ebtables_driver.py | 52 ++++++++----------- .../functional/agent/linux/test_ipset.py | 43 ++++++++------- .../functional/agent/linux/test_iptables.py | 43 ++++++++------- .../agent/linux/test_iptables_firewall.py | 10 ++-- .../tests/functional/agent/test_ovs_flows.py | 9 ++-- 8 files changed, 115 insertions(+), 117 deletions(-) diff --git a/neutron/tests/common/machine_fixtures.py b/neutron/tests/common/machine_fixtures.py index 5fa45375255..7cc626c887f 100644 --- a/neutron/tests/common/machine_fixtures.py +++ b/neutron/tests/common/machine_fixtures.py @@ -17,6 +17,31 @@ import fixtures from neutron.agent.linux import ip_lib from neutron.tests.common import net_helpers +from neutron.tests import tools + + +class Pinger(object): + def __init__(self, namespace, timeout=1, max_attempts=1): + self.namespace = namespace + self._timeout = timeout + self._max_attempts = max_attempts + + def _ping_destination(self, dest_address): + ns_ip_wrapper = ip_lib.IPWrapper(self.namespace) + ns_ip_wrapper.netns.execute(['ping', '-c', self._max_attempts, + '-W', self._timeout, dest_address]) + + def assert_ping(self, dst_ip): + self._ping_destination(dst_ip) + + def assert_no_ping(self, dst_ip): + try: + self._ping_destination(dst_ip) + tools.fail("destination ip %(dst_ip)s is replying to ping " + "from namespace %(ns)s, but it shouldn't" % + {'ns': self.namespace, 'dst_ip': dst_ip}) + except RuntimeError: + pass class FakeMachine(fixtures.Fixture): @@ -45,8 +70,9 @@ class FakeMachine(fixtures.Fixture): def setUp(self): super(FakeMachine, self).setUp() - self.namespace = self.useFixture( - net_helpers.NamespaceFixture()).name + ns_fixture = self.useFixture( + net_helpers.NamespaceFixture()) + self.namespace = ns_fixture.name self.port = self.useFixture( net_helpers.PortFixture.get(self.bridge, self.namespace)).port @@ -59,6 +85,14 @@ class FakeMachine(fixtures.Fixture): ns_ip_wrapper = ip_lib.IPWrapper(self.namespace) return ns_ip_wrapper.netns.execute(*args, **kwargs) + def assert_ping(self, dst_ip): + pinger = Pinger(self.namespace) + pinger.assert_ping(dst_ip) + + def assert_no_ping(self, dst_ip): + pinger = Pinger(self.namespace) + pinger.assert_no_ping(dst_ip) + class PeerMachines(fixtures.Fixture): """Create 'amount' peered machines on an ip_cidr. diff --git a/neutron/tests/functional/agent/linux/base.py b/neutron/tests/functional/agent/linux/base.py index 25741b140fa..b2d7b1be26c 100644 --- a/neutron/tests/functional/agent/linux/base.py +++ b/neutron/tests/functional/agent/linux/base.py @@ -14,9 +14,7 @@ import testscenarios -from neutron.agent.linux import ip_lib from neutron.tests import base as tests_base -from neutron.tests.common import machine_fixtures from neutron.tests.common import net_helpers from neutron.tests.functional import base as functional_base @@ -54,14 +52,3 @@ class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase): def setUp(self): super(BaseOVSLinuxTestCase, self).setUp() self.config(group='OVS', ovsdb_interface=self.ovsdb_interface) - - -class BaseIPVethTestCase(functional_base.BaseSudoTestCase): - - def prepare_veth_pairs(self): - bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge - machines = self.useFixture( - machine_fixtures.PeerMachines(bridge)).machines - self.SRC_ADDRESS = machines[0].ip - self.DST_ADDRESS = machines[1].ip - return [ip_lib.IPWrapper(m.namespace) for m in machines] diff --git a/neutron/tests/functional/agent/linux/helpers.py b/neutron/tests/functional/agent/linux/helpers.py index 1e51a9b81c0..611a423045d 100644 --- a/neutron/tests/functional/agent/linux/helpers.py +++ b/neutron/tests/functional/agent/linux/helpers.py @@ -25,7 +25,6 @@ import fixtures from neutron.agent.common import config from neutron.agent.linux import ip_lib from neutron.agent.linux import utils -from neutron.tests import tools CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20) CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5) @@ -99,29 +98,6 @@ def get_unused_port(used, start=1024, end=65535): return random.choice(list(candidates - used)) -class Pinger(object): - def __init__(self, namespace, timeout=1, max_attempts=1): - self.namespace = namespace - self._timeout = timeout - self._max_attempts = max_attempts - - def _ping_destination(self, dest_address): - self.namespace.netns.execute(['ping', '-c', self._max_attempts, - '-W', self._timeout, dest_address]) - - def assert_ping(self, dst_ip): - self._ping_destination(dst_ip) - - def assert_no_ping(self, dst_ip): - try: - self._ping_destination(dst_ip) - tools.fail("destination ip %(dst_ip)s is replying to ping" - "from namespace %(ns)s, but it shouldn't" % - {'ns': self.namespace.namespace, 'dst_ip': dst_ip}) - except RuntimeError: - pass - - class RootHelperProcess(subprocess.Popen): def __init__(self, cmd, *args, **kwargs): for arg in ('stdin', 'stdout', 'stderr'): diff --git a/neutron/tests/functional/agent/linux/test_ebtables_driver.py b/neutron/tests/functional/agent/linux/test_ebtables_driver.py index 8fc5fe10df3..999cc6762dc 100644 --- a/neutron/tests/functional/agent/linux/test_ebtables_driver.py +++ b/neutron/tests/functional/agent/linux/test_ebtables_driver.py @@ -15,9 +15,9 @@ from neutron.agent.linux import ebtables_driver from neutron.agent.linux import ip_lib -from neutron.agent.linux import utils as linux_utils -from neutron.tests.functional.agent.linux import base -from neutron.tests.functional.agent.linux import helpers +from neutron.tests.common import machine_fixtures +from neutron.tests.common import net_helpers +from neutron.tests.functional import base NO_FILTER_APPLY = ( @@ -70,36 +70,31 @@ FILTER_APPLY_TEMPLATE = ( "COMMIT") -class EbtablesLowLevelTestCase(base.BaseIPVethTestCase): +class EbtablesLowLevelTestCase(base.BaseSudoTestCase): def setUp(self): super(EbtablesLowLevelTestCase, self).setUp() - self.src_ns, self.dst_ns = self.prepare_veth_pairs() - devs = [d for d in self.src_ns.get_devices() if d.name != "lo"] - src_dev_name = devs[0].name - self.ns = self.src_ns.namespace - self.execute = linux_utils.execute - self.pinger = helpers.Pinger(self.src_ns) + + bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge + self.source, self.destination = self.useFixture( + machine_fixtures.PeerMachines(bridge)).machines # Extract MAC and IP address of one of my interfaces - self.mac = self.src_ns.device(src_dev_name).link.address - addr = [a for a in - self.src_ns.device(src_dev_name).addr.list()][0]['cidr'] - self.addr = addr.split("/")[0] + self.mac = self.source.port.link.address + self.addr = self.source.ip # Pick one of the namespaces and setup a bridge for the local ethernet # interface there, because ebtables only works on bridged interfaces. - self.src_ns.netns.execute("brctl addbr mybridge".split()) - self.src_ns.netns.execute(("brctl addif mybridge %s" % src_dev_name). - split()) + self.source.execute(['brctl', 'addbr', 'mybridge']) + self.source.execute( + ['brctl', 'addif', 'mybridge', self.source.port.name]) # Take the IP addrss off one of the interfaces and apply it to the # bridge interface instead. - dev_source = ip_lib.IPDevice(src_dev_name, self.src_ns.namespace) - dev_mybridge = ip_lib.IPDevice("mybridge", self.src_ns.namespace) - dev_source.addr.delete(addr) + self.source.port.addr.delete(self.source.ip_cidr) + dev_mybridge = ip_lib.IPDevice("mybridge", self.source.namespace) dev_mybridge.link.set_up() - dev_mybridge.addr.add(addr) + dev_mybridge.addr.add(self.source.ip_cidr) def _test_basic_port_filter_wrong_mac(self): # Setup filter with wrong IP/MAC address pair. Basic filters only allow @@ -108,15 +103,13 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase): mac_ip_pair = dict(mac_addr="11:11:11:22:22:22", ip_addr=self.addr) filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair ebtables_driver.ebtables_restore(filter_apply, - self.execute, - self.ns) - self.pinger.assert_no_ping(self.DST_ADDRESS) + self.source.execute) + self.source.assert_no_ping(self.destination.ip) # Assure that ping will work once we unfilter the instance ebtables_driver.ebtables_restore(NO_FILTER_APPLY, - self.execute, - self.ns) - self.pinger.assert_ping(self.DST_ADDRESS) + self.source.execute) + self.source.assert_ping(self.destination.ip) def _test_basic_port_filter_correct_mac(self): # Use the correct IP/MAC address pair for this one. @@ -124,10 +117,9 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase): filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair ebtables_driver.ebtables_restore(filter_apply, - self.execute, - self.ns) + self.source.execute) - self.pinger.assert_ping(self.DST_ADDRESS) + self.source.assert_ping(self.destination.ip) def test_ebtables_filtering(self): # Cannot parallelize those tests. Therefore need to call them diff --git a/neutron/tests/functional/agent/linux/test_ipset.py b/neutron/tests/functional/agent/linux/test_ipset.py index 499386423f1..703db22449b 100644 --- a/neutron/tests/functional/agent/linux/test_ipset.py +++ b/neutron/tests/functional/agent/linux/test_ipset.py @@ -13,10 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.agent.linux import ip_lib from neutron.agent.linux import ipset_manager from neutron.agent.linux import iptables_manager +from neutron.tests.common import machine_fixtures +from neutron.tests.common import net_helpers from neutron.tests.functional.agent.linux import base -from neutron.tests.functional.agent.linux import helpers +from neutron.tests.functional import base as functional_base IPSET_SET = 'test-set' IPSET_ETHERTYPE = 'IPv4' @@ -24,20 +27,22 @@ ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET UNRELATED_IP = '1.1.1.1' -class IpsetBase(base.BaseIPVethTestCase): +class IpsetBase(functional_base.BaseSudoTestCase): def setUp(self): super(IpsetBase, self).setUp() - self.src_ns, self.dst_ns = self.prepare_veth_pairs() - self.ipset = self._create_ipset_manager_and_set(self.dst_ns, - IPSET_SET) + bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge + self.source, self.destination = self.useFixture( + machine_fixtures.PeerMachines(bridge)).machines + + self.ipset = self._create_ipset_manager_and_set( + ip_lib.IPWrapper(self.destination.namespace), IPSET_SET) self.dst_iptables = iptables_manager.IptablesManager( - namespace=self.dst_ns.namespace) + namespace=self.destination.namespace) self._add_iptables_ipset_rules(self.dst_iptables) - self.pinger = helpers.Pinger(self.src_ns) def _create_ipset_manager_and_set(self, dst_ns, set_name): ipset = ipset_manager.IpsetManager( @@ -61,28 +66,28 @@ class IpsetBase(base.BaseIPVethTestCase): class IpsetManagerTestCase(IpsetBase): def test_add_member_allows_ping(self): - self.pinger.assert_no_ping(self.DST_ADDRESS) - self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS) - self.pinger.assert_ping(self.DST_ADDRESS) + self.source.assert_no_ping(self.destination.ip) + self.ipset._add_member_to_set(IPSET_SET, self.source.ip) + self.source.assert_ping(self.destination.ip) def test_del_member_denies_ping(self): - self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS) - self.pinger.assert_ping(self.DST_ADDRESS) + self.ipset._add_member_to_set(IPSET_SET, self.source.ip) + self.source.assert_ping(self.destination.ip) - self.ipset._del_member_from_set(IPSET_SET, self.SRC_ADDRESS) - self.pinger.assert_no_ping(self.DST_ADDRESS) + self.ipset._del_member_from_set(IPSET_SET, self.source.ip) + self.source.assert_no_ping(self.destination.ip) def test_refresh_ipset_allows_ping(self): self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE) - self.pinger.assert_no_ping(self.DST_ADDRESS) + self.source.assert_no_ping(self.destination.ip) - self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.SRC_ADDRESS], + self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip], IPSET_ETHERTYPE) - self.pinger.assert_ping(self.DST_ADDRESS) + self.source.assert_ping(self.destination.ip) - self.ipset._refresh_set(IPSET_SET, [self.SRC_ADDRESS, UNRELATED_IP], + self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP], IPSET_ETHERTYPE) - self.pinger.assert_ping(self.DST_ADDRESS) + self.source.assert_ping(self.destination.ip) def test_destroy_ipset_set(self): self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET) diff --git a/neutron/tests/functional/agent/linux/test_iptables.py b/neutron/tests/functional/agent/linux/test_iptables.py index e3805897127..43a8dc7b4f4 100644 --- a/neutron/tests/functional/agent/linux/test_iptables.py +++ b/neutron/tests/functional/agent/linux/test_iptables.py @@ -16,15 +16,19 @@ import os.path import testtools +from neutron.agent.linux import ip_lib from neutron.agent.linux import iptables_manager from neutron.agent.linux import utils from neutron.tests import base +from neutron.tests.common import machine_fixtures +from neutron.tests.common import net_helpers from neutron.tests.functional.agent.linux import base as linux_base from neutron.tests.functional.agent.linux.bin import ipt_binname from neutron.tests.functional.agent.linux import helpers +from neutron.tests.functional import base as functional_base -class IptablesManagerTestCase(linux_base.BaseIPVethTestCase): +class IptablesManagerTestCase(functional_base.BaseSudoTestCase): DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT', 'egress': 'OUTPUT'} PROTOCOL_BLOCK_RULE = '-p %s -j DROP' @@ -32,17 +36,21 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase): def setUp(self): super(IptablesManagerTestCase, self).setUp() - self.client_ns, self.server_ns = self.prepare_veth_pairs() + + bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge + self.client, self.server = self.useFixture( + machine_fixtures.PeerMachines(bridge)).machines + self.client_fw, self.server_fw = self.create_firewalls() # The port is used in isolated namespace that precludes possibility of # port conflicts - self.port = helpers.get_free_namespace_port(self.server_ns.namespace) + self.port = helpers.get_free_namespace_port(self.server.namespace) def create_firewalls(self): client_iptables = iptables_manager.IptablesManager( - namespace=self.client_ns.namespace) + namespace=self.client.namespace) server_iptables = iptables_manager.IptablesManager( - namespace=self.server_ns.namespace) + namespace=self.server.namespace) return client_iptables, server_iptables @@ -71,49 +79,48 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase): return chain, rule def _test_with_nc(self, fw_manager, direction, port, udp): - netcat = helpers.NetcatTester(self.client_ns, self.server_ns, - self.DST_ADDRESS, self.port, - run_as_root=True, udp=udp) + netcat = helpers.NetcatTester( + ip_lib.IPWrapper(self.client.namespace), + ip_lib.IPWrapper(self.server.namespace), + self.server.ip, self.port, run_as_root=True, udp=udp) self.addCleanup(netcat.stop_processes) protocol = 'tcp' if udp: protocol = 'udp' self.assertTrue(netcat.test_connectivity()) self.filter_add_rule( - fw_manager, self.DST_ADDRESS, direction, protocol, port) + fw_manager, self.server.ip, direction, protocol, port) with testtools.ExpectedException(RuntimeError): netcat.test_connectivity() self.filter_remove_rule( - fw_manager, self.DST_ADDRESS, direction, protocol, port) + fw_manager, self.server.ip, direction, protocol, port) self.assertTrue(netcat.test_connectivity(True)) def test_icmp(self): - pinger = helpers.Pinger(self.client_ns) - pinger.assert_ping(self.DST_ADDRESS) + self.client.assert_ping(self.server.ip) self.server_fw.ipv4['filter'].add_rule('INPUT', linux_base.ICMP_BLOCK_RULE) self.server_fw.apply() - pinger.assert_no_ping(self.DST_ADDRESS) + self.client.assert_no_ping(self.server.ip) self.server_fw.ipv4['filter'].remove_rule('INPUT', linux_base.ICMP_BLOCK_RULE) self.server_fw.apply() - pinger.assert_ping(self.DST_ADDRESS) + self.client.assert_ping(self.server.ip) def test_mangle_icmp(self): - pinger = helpers.Pinger(self.client_ns) - pinger.assert_ping(self.DST_ADDRESS) + self.client.assert_ping(self.server.ip) self.server_fw.ipv4['mangle'].add_rule('INPUT', linux_base.ICMP_MARK_RULE) self.server_fw.ipv4['filter'].add_rule('INPUT', linux_base.MARKED_BLOCK_RULE) self.server_fw.apply() - pinger.assert_no_ping(self.DST_ADDRESS) + self.client.assert_no_ping(self.server.ip) self.server_fw.ipv4['mangle'].remove_rule('INPUT', linux_base.ICMP_MARK_RULE) self.server_fw.ipv4['filter'].remove_rule('INPUT', linux_base.MARKED_BLOCK_RULE) self.server_fw.apply() - pinger.assert_ping(self.DST_ADDRESS) + self.client.assert_ping(self.server.ip) def test_tcp_input_port(self): self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False) diff --git a/neutron/tests/functional/agent/linux/test_iptables_firewall.py b/neutron/tests/functional/agent/linux/test_iptables_firewall.py index 4d2c815e8ab..2d82fdd73b8 100644 --- a/neutron/tests/functional/agent/linux/test_iptables_firewall.py +++ b/neutron/tests/functional/agent/linux/test_iptables_firewall.py @@ -16,12 +16,10 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.agent.linux import ip_lib from neutron.agent.linux import iptables_firewall from neutron.agent import securitygroups_rpc as sg_cfg from neutron.tests.common import machine_fixtures from neutron.tests.common import net_helpers -from neutron.tests.functional.agent.linux import helpers from neutron.tests.functional import base from oslo_config import cfg @@ -63,8 +61,6 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase): # setup firewall on bridge and send packet from src_veth and observe # if sent packet can be observed on dst_veth def test_port_sec_within_firewall(self): - client_ip_wrapper = ip_lib.IPWrapper(self.client.namespace) - pinger = helpers.Pinger(client_ip_wrapper) # update the sg_group to make ping pass sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress', @@ -76,13 +72,13 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase): self.FAKE_SECURITY_GROUP_ID, sg_rules) self.firewall.prepare_port_filter(self.src_port_desc) - pinger.assert_ping(self.server.ip) + self.client.assert_ping(self.server.ip) # modify the src_veth's MAC and test again self._set_src_mac(self.MAC_SPOOFED) - pinger.assert_no_ping(self.server.ip) + self.client.assert_no_ping(self.server.ip) # update the port's port_security_enabled value and test again self.src_port_desc['port_security_enabled'] = False self.firewall.update_port_filter(self.src_port_desc) - pinger.assert_ping(self.server.ip) + self.client.assert_ping(self.server.ip) diff --git a/neutron/tests/functional/agent/test_ovs_flows.py b/neutron/tests/functional/agent/test_ovs_flows.py index c012880a555..699e4c6ba34 100644 --- a/neutron/tests/functional/agent/test_ovs_flows.py +++ b/neutron/tests/functional/agent/test_ovs_flows.py @@ -15,14 +15,14 @@ from neutron.cmd.sanity import checks from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt +from neutron.tests.common import machine_fixtures from neutron.tests.common import net_helpers -from neutron.tests.functional.agent.linux import base -from neutron.tests.functional.agent.linux import helpers from neutron.tests.functional.agent import test_ovs_lib +from neutron.tests.functional import base class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase, - base.BaseIPVethTestCase): + base.BaseSudoTestCase): def setUp(self): if not checks.arp_header_match_supported(): @@ -35,7 +35,8 @@ class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase, self.dst_addr = '192.168.0.2' self.src_ns = self._create_namespace() self.dst_ns = self._create_namespace() - self.pinger = helpers.Pinger(self.src_ns, max_attempts=2) + self.pinger = machine_fixtures.Pinger( + self.src_ns.namespace, max_attempts=2) self.src_p = self.useFixture( net_helpers.OVSPortFixture(self.br, self.src_ns.namespace)).port self.dst_p = self.useFixture(