Replace BaseIPVethTestCase by FakeMachine

This change removes BaseIPVethTestCase class and moves Pinger class to
allow its use from a fake machine.

Change-Id: I0636f11a327e9535828e7b52e60195e52831a0b2
This commit is contained in:
Cedric Brandily 2015-03-01 23:05:36 +00:00
parent bc1dc8d288
commit 8a4540acac
8 changed files with 115 additions and 117 deletions

View File

@ -17,6 +17,31 @@ import fixtures
from neutron.agent.linux import ip_lib
from neutron.tests.common import net_helpers
from neutron.tests import tools
class Pinger(object):
def __init__(self, namespace, timeout=1, max_attempts=1):
self.namespace = namespace
self._timeout = timeout
self._max_attempts = max_attempts
def _ping_destination(self, dest_address):
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
ns_ip_wrapper.netns.execute(['ping', '-c', self._max_attempts,
'-W', self._timeout, dest_address])
def assert_ping(self, dst_ip):
self._ping_destination(dst_ip)
def assert_no_ping(self, dst_ip):
try:
self._ping_destination(dst_ip)
tools.fail("destination ip %(dst_ip)s is replying to ping "
"from namespace %(ns)s, but it shouldn't" %
{'ns': self.namespace, 'dst_ip': dst_ip})
except RuntimeError:
pass
class FakeMachine(fixtures.Fixture):
@ -45,8 +70,9 @@ class FakeMachine(fixtures.Fixture):
def setUp(self):
super(FakeMachine, self).setUp()
self.namespace = self.useFixture(
net_helpers.NamespaceFixture()).name
ns_fixture = self.useFixture(
net_helpers.NamespaceFixture())
self.namespace = ns_fixture.name
self.port = self.useFixture(
net_helpers.PortFixture.get(self.bridge, self.namespace)).port
@ -59,6 +85,14 @@ class FakeMachine(fixtures.Fixture):
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
return ns_ip_wrapper.netns.execute(*args, **kwargs)
def assert_ping(self, dst_ip):
pinger = Pinger(self.namespace)
pinger.assert_ping(dst_ip)
def assert_no_ping(self, dst_ip):
pinger = Pinger(self.namespace)
pinger.assert_no_ping(dst_ip)
class PeerMachines(fixtures.Fixture):
"""Create 'amount' peered machines on an ip_cidr.

View File

@ -14,9 +14,7 @@
import testscenarios
from neutron.agent.linux import ip_lib
from neutron.tests import base as tests_base
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
@ -54,14 +52,3 @@ class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
def setUp(self):
super(BaseOVSLinuxTestCase, self).setUp()
self.config(group='OVS', ovsdb_interface=self.ovsdb_interface)
class BaseIPVethTestCase(functional_base.BaseSudoTestCase):
def prepare_veth_pairs(self):
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
machines = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines
self.SRC_ADDRESS = machines[0].ip
self.DST_ADDRESS = machines[1].ip
return [ip_lib.IPWrapper(m.namespace) for m in machines]

View File

@ -25,7 +25,6 @@ import fixtures
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.tests import tools
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
@ -99,29 +98,6 @@ def get_unused_port(used, start=1024, end=65535):
return random.choice(list(candidates - used))
class Pinger(object):
def __init__(self, namespace, timeout=1, max_attempts=1):
self.namespace = namespace
self._timeout = timeout
self._max_attempts = max_attempts
def _ping_destination(self, dest_address):
self.namespace.netns.execute(['ping', '-c', self._max_attempts,
'-W', self._timeout, dest_address])
def assert_ping(self, dst_ip):
self._ping_destination(dst_ip)
def assert_no_ping(self, dst_ip):
try:
self._ping_destination(dst_ip)
tools.fail("destination ip %(dst_ip)s is replying to ping"
"from namespace %(ns)s, but it shouldn't" %
{'ns': self.namespace.namespace, 'dst_ip': dst_ip})
except RuntimeError:
pass
class RootHelperProcess(subprocess.Popen):
def __init__(self, cmd, *args, **kwargs):
for arg in ('stdin', 'stdout', 'stderr'):

View File

@ -15,9 +15,9 @@
from neutron.agent.linux import ebtables_driver
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional import base
NO_FILTER_APPLY = (
@ -70,36 +70,31 @@ FILTER_APPLY_TEMPLATE = (
"COMMIT")
class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
class EbtablesLowLevelTestCase(base.BaseSudoTestCase):
def setUp(self):
super(EbtablesLowLevelTestCase, self).setUp()
self.src_ns, self.dst_ns = self.prepare_veth_pairs()
devs = [d for d in self.src_ns.get_devices() if d.name != "lo"]
src_dev_name = devs[0].name
self.ns = self.src_ns.namespace
self.execute = linux_utils.execute
self.pinger = helpers.Pinger(self.src_ns)
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.source, self.destination = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines
# Extract MAC and IP address of one of my interfaces
self.mac = self.src_ns.device(src_dev_name).link.address
addr = [a for a in
self.src_ns.device(src_dev_name).addr.list()][0]['cidr']
self.addr = addr.split("/")[0]
self.mac = self.source.port.link.address
self.addr = self.source.ip
# Pick one of the namespaces and setup a bridge for the local ethernet
# interface there, because ebtables only works on bridged interfaces.
self.src_ns.netns.execute("brctl addbr mybridge".split())
self.src_ns.netns.execute(("brctl addif mybridge %s" % src_dev_name).
split())
self.source.execute(['brctl', 'addbr', 'mybridge'])
self.source.execute(
['brctl', 'addif', 'mybridge', self.source.port.name])
# Take the IP addrss off one of the interfaces and apply it to the
# bridge interface instead.
dev_source = ip_lib.IPDevice(src_dev_name, self.src_ns.namespace)
dev_mybridge = ip_lib.IPDevice("mybridge", self.src_ns.namespace)
dev_source.addr.delete(addr)
self.source.port.addr.delete(self.source.ip_cidr)
dev_mybridge = ip_lib.IPDevice("mybridge", self.source.namespace)
dev_mybridge.link.set_up()
dev_mybridge.addr.add(addr)
dev_mybridge.addr.add(self.source.ip_cidr)
def _test_basic_port_filter_wrong_mac(self):
# Setup filter with wrong IP/MAC address pair. Basic filters only allow
@ -108,15 +103,13 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
mac_ip_pair = dict(mac_addr="11:11:11:22:22:22", ip_addr=self.addr)
filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
self.execute,
self.ns)
self.pinger.assert_no_ping(self.DST_ADDRESS)
self.source.execute)
self.source.assert_no_ping(self.destination.ip)
# Assure that ping will work once we unfilter the instance
ebtables_driver.ebtables_restore(NO_FILTER_APPLY,
self.execute,
self.ns)
self.pinger.assert_ping(self.DST_ADDRESS)
self.source.execute)
self.source.assert_ping(self.destination.ip)
def _test_basic_port_filter_correct_mac(self):
# Use the correct IP/MAC address pair for this one.
@ -124,10 +117,9 @@ class EbtablesLowLevelTestCase(base.BaseIPVethTestCase):
filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
self.execute,
self.ns)
self.source.execute)
self.pinger.assert_ping(self.DST_ADDRESS)
self.source.assert_ping(self.destination.ip)
def test_ebtables_filtering(self):
# Cannot parallelize those tests. Therefore need to call them

View File

@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ipset_manager
from neutron.agent.linux import iptables_manager
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
IPSET_SET = 'test-set'
IPSET_ETHERTYPE = 'IPv4'
@ -24,20 +27,22 @@ ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET
UNRELATED_IP = '1.1.1.1'
class IpsetBase(base.BaseIPVethTestCase):
class IpsetBase(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpsetBase, self).setUp()
self.src_ns, self.dst_ns = self.prepare_veth_pairs()
self.ipset = self._create_ipset_manager_and_set(self.dst_ns,
IPSET_SET)
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.source, self.destination = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines
self.ipset = self._create_ipset_manager_and_set(
ip_lib.IPWrapper(self.destination.namespace), IPSET_SET)
self.dst_iptables = iptables_manager.IptablesManager(
namespace=self.dst_ns.namespace)
namespace=self.destination.namespace)
self._add_iptables_ipset_rules(self.dst_iptables)
self.pinger = helpers.Pinger(self.src_ns)
def _create_ipset_manager_and_set(self, dst_ns, set_name):
ipset = ipset_manager.IpsetManager(
@ -61,28 +66,28 @@ class IpsetBase(base.BaseIPVethTestCase):
class IpsetManagerTestCase(IpsetBase):
def test_add_member_allows_ping(self):
self.pinger.assert_no_ping(self.DST_ADDRESS)
self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
self.pinger.assert_ping(self.DST_ADDRESS)
self.source.assert_no_ping(self.destination.ip)
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
self.source.assert_ping(self.destination.ip)
def test_del_member_denies_ping(self):
self.ipset._add_member_to_set(IPSET_SET, self.SRC_ADDRESS)
self.pinger.assert_ping(self.DST_ADDRESS)
self.ipset._add_member_to_set(IPSET_SET, self.source.ip)
self.source.assert_ping(self.destination.ip)
self.ipset._del_member_from_set(IPSET_SET, self.SRC_ADDRESS)
self.pinger.assert_no_ping(self.DST_ADDRESS)
self.ipset._del_member_from_set(IPSET_SET, self.source.ip)
self.source.assert_no_ping(self.destination.ip)
def test_refresh_ipset_allows_ping(self):
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE)
self.pinger.assert_no_ping(self.DST_ADDRESS)
self.source.assert_no_ping(self.destination.ip)
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.SRC_ADDRESS],
self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip],
IPSET_ETHERTYPE)
self.pinger.assert_ping(self.DST_ADDRESS)
self.source.assert_ping(self.destination.ip)
self.ipset._refresh_set(IPSET_SET, [self.SRC_ADDRESS, UNRELATED_IP],
self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP],
IPSET_ETHERTYPE)
self.pinger.assert_ping(self.DST_ADDRESS)
self.source.assert_ping(self.destination.ip)
def test_destroy_ipset_set(self):
self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET)

View File

@ -16,15 +16,19 @@ import os.path
import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import utils
from neutron.tests import base
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base as linux_base
from neutron.tests.functional.agent.linux.bin import ipt_binname
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
'egress': 'OUTPUT'}
PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
@ -32,17 +36,21 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
self.client_ns, self.server_ns = self.prepare_veth_pairs()
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.client, self.server = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines
self.client_fw, self.server_fw = self.create_firewalls()
# The port is used in isolated namespace that precludes possibility of
# port conflicts
self.port = helpers.get_free_namespace_port(self.server_ns.namespace)
self.port = helpers.get_free_namespace_port(self.server.namespace)
def create_firewalls(self):
client_iptables = iptables_manager.IptablesManager(
namespace=self.client_ns.namespace)
namespace=self.client.namespace)
server_iptables = iptables_manager.IptablesManager(
namespace=self.server_ns.namespace)
namespace=self.server.namespace)
return client_iptables, server_iptables
@ -71,49 +79,48 @@ class IptablesManagerTestCase(linux_base.BaseIPVethTestCase):
return chain, rule
def _test_with_nc(self, fw_manager, direction, port, udp):
netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
self.DST_ADDRESS, self.port,
run_as_root=True, udp=udp)
netcat = helpers.NetcatTester(
ip_lib.IPWrapper(self.client.namespace),
ip_lib.IPWrapper(self.server.namespace),
self.server.ip, self.port, run_as_root=True, udp=udp)
self.addCleanup(netcat.stop_processes)
protocol = 'tcp'
if udp:
protocol = 'udp'
self.assertTrue(netcat.test_connectivity())
self.filter_add_rule(
fw_manager, self.DST_ADDRESS, direction, protocol, port)
fw_manager, self.server.ip, direction, protocol, port)
with testtools.ExpectedException(RuntimeError):
netcat.test_connectivity()
self.filter_remove_rule(
fw_manager, self.DST_ADDRESS, direction, protocol, port)
fw_manager, self.server.ip, direction, protocol, port)
self.assertTrue(netcat.test_connectivity(True))
def test_icmp(self):
pinger = helpers.Pinger(self.client_ns)
pinger.assert_ping(self.DST_ADDRESS)
self.client.assert_ping(self.server.ip)
self.server_fw.ipv4['filter'].add_rule('INPUT',
linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_no_ping(self.DST_ADDRESS)
self.client.assert_no_ping(self.server.ip)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
linux_base.ICMP_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_ping(self.DST_ADDRESS)
self.client.assert_ping(self.server.ip)
def test_mangle_icmp(self):
pinger = helpers.Pinger(self.client_ns)
pinger.assert_ping(self.DST_ADDRESS)
self.client.assert_ping(self.server.ip)
self.server_fw.ipv4['mangle'].add_rule('INPUT',
linux_base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].add_rule('INPUT',
linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_no_ping(self.DST_ADDRESS)
self.client.assert_no_ping(self.server.ip)
self.server_fw.ipv4['mangle'].remove_rule('INPUT',
linux_base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
linux_base.MARKED_BLOCK_RULE)
self.server_fw.apply()
pinger.assert_ping(self.DST_ADDRESS)
self.client.assert_ping(self.server.ip)
def test_tcp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False)

View File

@ -16,12 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_firewall
from neutron.agent import securitygroups_rpc as sg_cfg
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base
from oslo_config import cfg
@ -63,8 +61,6 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase):
# setup firewall on bridge and send packet from src_veth and observe
# if sent packet can be observed on dst_veth
def test_port_sec_within_firewall(self):
client_ip_wrapper = ip_lib.IPWrapper(self.client.namespace)
pinger = helpers.Pinger(client_ip_wrapper)
# update the sg_group to make ping pass
sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
@ -76,13 +72,13 @@ class IptablesFirewallTestCase(base.BaseSudoTestCase):
self.FAKE_SECURITY_GROUP_ID,
sg_rules)
self.firewall.prepare_port_filter(self.src_port_desc)
pinger.assert_ping(self.server.ip)
self.client.assert_ping(self.server.ip)
# modify the src_veth's MAC and test again
self._set_src_mac(self.MAC_SPOOFED)
pinger.assert_no_ping(self.server.ip)
self.client.assert_no_ping(self.server.ip)
# update the port's port_security_enabled value and test again
self.src_port_desc['port_security_enabled'] = False
self.firewall.update_port_filter(self.src_port_desc)
pinger.assert_ping(self.server.ip)
self.client.assert_ping(self.server.ip)

View File

@ -15,14 +15,14 @@
from neutron.cmd.sanity import checks
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional.agent import test_ovs_lib
from neutron.tests.functional import base
class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
base.BaseIPVethTestCase):
base.BaseSudoTestCase):
def setUp(self):
if not checks.arp_header_match_supported():
@ -35,7 +35,8 @@ class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
self.dst_addr = '192.168.0.2'
self.src_ns = self._create_namespace()
self.dst_ns = self._create_namespace()
self.pinger = helpers.Pinger(self.src_ns, max_attempts=2)
self.pinger = machine_fixtures.Pinger(
self.src_ns.namespace, max_attempts=2)
self.src_p = self.useFixture(
net_helpers.OVSPortFixture(self.br, self.src_ns.namespace)).port
self.dst_p = self.useFixture(