Merge "Add functional tests for IptablesManager using tcp/udp"

This commit is contained in:
Jenkins 2015-01-26 22:38:58 +00:00 committed by Gerrit Code Review
commit fae9ae618d
3 changed files with 224 additions and 22 deletions

View File

@ -18,3 +18,9 @@ ping_filter: CommandFilter, ping, root
curl_filter: CommandFilter, curl, root
tee_filter: CommandFilter, tee, root
tee_kill: KillFilter, root, tee, -9
nc_filter: CommandFilter, nc, root
# netcat has different binaries depending on linux distribution
nc_kill: KillFilter, root, nc, -9
ncbsd_kill: KillFilter, root, nc.openbsd, -9
ncat_kill: KillFilter, root, ncat, -9
ss_filter: CommandFilter, ss, root

View File

@ -12,19 +12,63 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import functools
import os
import random
import re
import select
import shlex
import subprocess
import eventlet
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5)
SS_SOURCE_PORT_PATTERN = re.compile(
r'^.*\s+\d+\s+.*:(?P<port>\d+)\s+[0-9:].*')
def get_free_namespace_port(tcp=True, root_helper=None, namespace=None):
"""Return an unused port from given namespace
WARNING: This function returns a port that is free at the execution time of
this function. If this port is used later for binding then there
is a potential danger that port will be no longer free. It's up to
the programmer to handle error if port is already in use.
:param tcp: Return free port for TCP protocol if set to True, return free
port for UDP protocol if set to False
"""
if tcp:
param = '-tna'
else:
param = '-una'
ip_wrapper = ip_lib.IPWrapper(root_helper, namespace)
output = ip_wrapper.netns.execute(['ss', param])
used_ports = _get_source_ports_from_ss_output(output)
return get_unused_port(used_ports)
def _get_source_ports_from_ss_output(output):
ports = set()
for line in output.splitlines():
match = SS_SOURCE_PORT_PATTERN.match(line)
if match:
ports.add(match.group('port'))
return ports
def get_unused_port(used, start=1024, end=65535):
candidates = set(range(start, end + 1))
return random.choice(list(candidates - used))
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""
@ -165,3 +209,73 @@ class RootHelperProcess(subprocess.Popen):
"in %d seconds" % (self.cmd, timeout)))
self.child_pid = utils.get_root_helper_child_pid(
self.pid, root_helper=self.root_helper)
class NetcatTester(object):
TESTING_STRING = 'foo'
def __init__(self, client_namespace, server_namespace, address, port,
root_helper='', udp=False):
self.client_namespace = client_namespace
self.server_namespace = server_namespace
self._client_process = None
self._server_process = None
self.address = address
self.port = str(port)
self.root_helper = root_helper
self.udp = udp
@property
def client_process(self):
if not self._client_process:
if not self._server_process:
self._spawn_server_process()
self._client_process = self._spawn_nc_in_namespace(
self.client_namespace.namespace)
return self._client_process
@property
def server_process(self):
if not self._server_process:
self._spawn_server_process()
return self._server_process
def _spawn_server_process(self):
self._server_process = self._spawn_nc_in_namespace(
self.server_namespace.namespace, listen=True)
def test_connectivity(self, respawn=False):
stop_required = (respawn and self._client_process and
self._client_process.poll() is not None)
if stop_required:
self.stop_processes()
self.client_process.writeline(self.TESTING_STRING)
message = self.server_process.read_stdout(READ_TIMEOUT).strip()
self.server_process.writeline(message)
message = self.client_process.read_stdout(READ_TIMEOUT).strip()
return message == self.TESTING_STRING
def _spawn_nc_in_namespace(self, namespace, listen=False):
cmd = ['nc', self.address, self.port]
if self.udp:
cmd.append('-u')
if listen:
cmd.append('-l')
if not self.udp:
cmd.append('-k')
else:
cmd.extend(['-w', '20'])
proc = RootHelperProcess(cmd, namespace=namespace,
root_helper=self.root_helper)
return proc
def stop_processes(self):
for proc_attr in ('_client_process', '_server_process'):
proc = getattr(self, proc_attr)
if proc:
if proc.poll() is None:
proc.kill()
proc.wait()
setattr(self, proc_attr, None)

View File

@ -12,39 +12,121 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from neutron.agent.linux import iptables_manager
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
class IptablesManagerTestCase(base.BaseIPVethTestCase):
DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT',
'egress': 'OUTPUT'}
PROTOCOL_BLOCK_RULE = '-p %s -j DROP'
PROTOCOL_PORT_BLOCK_RULE = '-p %s --dport %d -j DROP'
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
self.src_ns, self.dst_ns = self.prepare_veth_pairs()
self.iptables = iptables_manager.IptablesManager(
self.client_ns, self.server_ns = self.prepare_veth_pairs()
self.client_fw, self.server_fw = self.create_firewalls()
# The port is used in isolated namespace that precludes possibility of
# port conflicts
self.port = helpers.get_free_namespace_port(self.server_ns.namespace)
def create_firewalls(self):
client_iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
namespace=self.dst_ns.namespace)
namespace=self.client_ns.namespace)
server_iptables = iptables_manager.IptablesManager(
root_helper=self.root_helper,
namespace=self.server_ns.namespace)
return client_iptables, server_iptables
def filter_add_rule(self, fw_manager, address, direction, protocol, port):
self._ipv4_filter_execute(fw_manager, 'add_rule', direction, protocol,
port)
def filter_remove_rule(self, fw_manager, address, direction, protocol,
port):
self._ipv4_filter_execute(fw_manager, 'remove_rule', direction,
protocol, port)
def _ipv4_filter_execute(self, fw_manager, method, direction, protocol,
port):
chain, rule = self._get_chain_and_rule(direction, protocol, port)
method = getattr(fw_manager.ipv4['filter'], method)
method(chain, rule)
fw_manager.apply()
def _get_chain_and_rule(self, direction, protocol, port):
chain = self.DIRECTION_CHAIN_MAPPER[direction]
if port:
rule = self.PROTOCOL_PORT_BLOCK_RULE % (protocol, port)
else:
rule = self.PROTOCOL_BLOCK_RULE % protocol
return chain, rule
def _test_with_nc(self, fw_manager, direction, port, udp):
netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
self.DST_ADDRESS, self.port,
self.root_helper, udp=udp)
self.addCleanup(netcat.stop_processes)
protocol = 'tcp'
if udp:
protocol = 'udp'
self.assertTrue(netcat.test_connectivity())
self.filter_add_rule(
fw_manager, self.DST_ADDRESS, direction, protocol, port)
with testtools.ExpectedException(RuntimeError):
netcat.test_connectivity()
self.filter_remove_rule(
fw_manager, self.DST_ADDRESS, direction, protocol, port)
self.assertTrue(netcat.test_connectivity(True))
def test_icmp(self):
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.iptables.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
self.iptables.apply()
self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.iptables.ipv4['filter'].remove_rule('INPUT',
self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE)
self.server_fw.apply()
self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
base.ICMP_BLOCK_RULE)
self.iptables.apply()
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.server_fw.apply()
self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
def test_mangle_icmp(self):
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.iptables.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
self.iptables.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
self.iptables.apply()
self.pinger.assert_no_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.iptables.ipv4['mangle'].remove_rule('INPUT',
base.ICMP_MARK_RULE)
self.iptables.ipv4['filter'].remove_rule('INPUT',
base.MARKED_BLOCK_RULE)
self.iptables.apply()
self.pinger.assert_ping_from_ns(self.src_ns, self.DST_ADDRESS)
self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE)
self.server_fw.apply()
self.pinger.assert_no_ping_from_ns(self.client_ns, self.DST_ADDRESS)
self.server_fw.ipv4['mangle'].remove_rule('INPUT',
base.ICMP_MARK_RULE)
self.server_fw.ipv4['filter'].remove_rule('INPUT',
base.MARKED_BLOCK_RULE)
self.server_fw.apply()
self.pinger.assert_ping_from_ns(self.client_ns, self.DST_ADDRESS)
def test_tcp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port, udp=False)
def test_tcp_output_port(self):
self._test_with_nc(self.client_fw, 'egress', self.port, udp=False)
def test_tcp_input(self):
self._test_with_nc(self.server_fw, 'ingress', port=None, udp=False)
def test_tcp_output(self):
self._test_with_nc(self.client_fw, 'egress', port=None, udp=False)
def test_udp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port, udp=True)
def test_udp_output_port(self):
self._test_with_nc(self.client_fw, 'egress', self.port, udp=True)
def test_udp_input(self):
self._test_with_nc(self.server_fw, 'ingress', port=None, udp=True)
def test_udp_output(self):
self._test_with_nc(self.client_fw, 'egress', port=None, udp=True)