Move NetcatTester to common/net_helpers

The NetcatTester is a testing tool that can be used also in fullstack
tests so I think it should go there to avoid imports in fullstack tests
from functional.

Tests for original helpers module was removed.

Change-Id: I7229eba1dbc2ca3d524a1a021256b6202f4aecee
This commit is contained in:
Jakub Libosvar 2015-06-17 13:10:13 +00:00
parent b622e6538a
commit b9e9cfb08b
6 changed files with 244 additions and 278 deletions

View File

@ -14,14 +14,23 @@
#
import abc
import functools
import os
import random
import re
import select
import shlex
import subprocess
import netaddr
from oslo_utils import uuidutils
import six
from neutron.agent.common import config
from neutron.agent.common import ovs_lib
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
from neutron.tests import base as tests_base
from neutron.tests.common import base as common_base
@ -33,6 +42,16 @@ PORT_PREFIX = 'test-port'
VETH0_PREFIX = 'test-veth0'
VETH1_PREFIX = 'test-veth1'
SS_SOURCE_PORT_PATTERN = re.compile(
r'^.*\s+\d+\s+.*:(?P<port>\d+)\s+[0-9:].*')
READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5)
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
TRANSPORT_PROTOCOLS = (n_const.PROTO_NAME_TCP, n_const.PROTO_NAME_UDP)
def get_rand_port_name():
return tests_base.get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN,
@ -105,6 +124,209 @@ def assert_no_arping(src_namespace, dst_ip, source=None, timeout=1, count=1):
{'ns': src_namespace, 'destination': dst_ip})
def _get_source_ports_from_ss_output(output):
ports = set()
for line in output.splitlines():
match = SS_SOURCE_PORT_PATTERN.match(line)
if match:
ports.add(match.group('port'))
return ports
def get_unused_port(used, start=1024, end=65535):
candidates = set(range(start, end + 1))
return random.choice(list(candidates - used))
def get_free_namespace_port(protocol, namespace=None):
"""Return an unused port from given namespace
WARNING: This function returns a port that is free at the execution time of
this function. If this port is used later for binding then there
is a potential danger that port will be no longer free. It's up to
the programmer to handle error if port is already in use.
:param protocol: Return free port for given protocol. Supported protocols
are 'tcp' and 'udp'.
"""
if protocol == n_const.PROTO_NAME_TCP:
param = '-tna'
elif protocol == n_const.PROTO_NAME_UDP:
param = '-una'
else:
raise ValueError("Unsupported procotol %s" % protocol)
ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
output = ip_wrapper.netns.execute(['ss', param])
used_ports = _get_source_ports_from_ss_output(output)
return get_unused_port(used_ports)
class RootHelperProcess(subprocess.Popen):
def __init__(self, cmd, *args, **kwargs):
for arg in ('stdin', 'stdout', 'stderr'):
kwargs.setdefault(arg, subprocess.PIPE)
self.namespace = kwargs.pop('namespace', None)
self.cmd = cmd
if self.namespace is not None:
cmd = ['ip', 'netns', 'exec', self.namespace] + cmd
root_helper = config.get_root_helper(utils.cfg.CONF)
cmd = shlex.split(root_helper) + cmd
self.child_pid = None
super(RootHelperProcess, self).__init__(cmd, *args, **kwargs)
self._wait_for_child_process()
def kill(self):
pid = self.child_pid or str(self.pid)
utils.execute(['kill', '-9', pid], run_as_root=True)
def read_stdout(self, timeout=None):
return self._read_stream(self.stdout, timeout)
@staticmethod
def _read_stream(stream, timeout):
if timeout:
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
utils.wait_until_true(poll_predicate, timeout, 0.1,
RuntimeError(
'No output in %.2f seconds' % timeout))
return stream.readline()
def writeline(self, data):
self.stdin.write(data + os.linesep)
self.stdin.flush()
def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT,
sleep=CHILD_PROCESS_SLEEP):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
utils.wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned "
"in %d seconds" % (self.cmd, timeout)))
self.child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
class NetcatTester(object):
TESTING_STRING = 'foo'
TCP = n_const.PROTO_NAME_TCP
UDP = n_const.PROTO_NAME_UDP
def __init__(self, client_namespace, server_namespace, address,
dst_port, protocol, server_address='0.0.0.0', src_port=None):
"""
Tool for testing connectivity on transport layer using netcat
executable.
The processes are spawned lazily.
:param client_namespace: Namespace in which netcat process that
connects to other netcat will be spawned
:param server_namespace: Namespace in which listening netcat process
will be spawned
:param address: Server address from client point of view
:param dst_port: Port on which netcat listens
:param protocol: Transport protocol, either 'tcp' or 'udp'
:param server_address: Address in server namespace on which netcat
should listen
:param src_port: Source port of netcat process spawned in client
namespace - packet will have src_port in TCP/UDP
header with this value
"""
self.client_namespace = client_namespace
self.server_namespace = server_namespace
self._client_process = None
self._server_process = None
self.address = address
self.server_address = server_address
self.dst_port = str(dst_port)
self.src_port = str(src_port) if src_port else None
if protocol not in TRANSPORT_PROTOCOLS:
raise ValueError("Unsupported protocol %s" % protocol)
self.protocol = protocol
@property
def client_process(self):
if not self._client_process:
self.establish_connection()
return self._client_process
@property
def server_process(self):
if not self._server_process:
self._spawn_server_process()
return self._server_process
def _spawn_server_process(self):
self._server_process = self._spawn_nc_in_namespace(
self.server_namespace,
address=self.server_address,
listen=True)
def establish_connection(self):
if self._client_process:
raise RuntimeError('%(proto)s connection to $(ip_addr)s is already'
' established' %
{'proto': self.protocol,
'ip_addr': self.address})
if not self._server_process:
self._spawn_server_process()
self._client_process = self._spawn_nc_in_namespace(
self.client_namespace,
address=self.address)
if self.protocol == self.UDP:
# Create an entry in conntrack table for UDP packets
self.client_process.writeline(self.TESTING_STRING)
def test_connectivity(self, respawn=False):
stop_required = (respawn and self._client_process and
self._client_process.poll() is not None)
if stop_required:
self.stop_processes()
self.client_process.writeline(self.TESTING_STRING)
message = self.server_process.read_stdout(READ_TIMEOUT).strip()
self.server_process.writeline(message)
message = self.client_process.read_stdout(READ_TIMEOUT).strip()
return message == self.TESTING_STRING
def _spawn_nc_in_namespace(self, namespace, address, listen=False):
cmd = ['nc', address, self.dst_port]
if self.protocol == self.UDP:
cmd.append('-u')
if listen:
cmd.append('-l')
if self.protocol == self.TCP:
cmd.append('-k')
else:
cmd.extend(['-w', '20'])
if self.src_port:
cmd.extend(['-p', self.src_port])
proc = RootHelperProcess(cmd, namespace=namespace)
return proc
def stop_processes(self):
for proc_attr in ('_client_process', '_server_process'):
proc = getattr(self, proc_attr)
if proc:
if proc.poll() is None:
proc.kill()
proc.wait()
setattr(self, proc_attr, None)
class NamespaceFixture(tools.SafeFixture):
"""Create a namespace.

View File

@ -20,7 +20,7 @@ import six
from neutron.common import constants
from neutron.tests import base
from neutron.tests.common import helpers as c_helpers
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.common import net_helpers
from neutron.tests import tools
@ -140,7 +140,8 @@ class NeutronConfigFixture(ConfigFixture):
This might fail if some other process occupies this port after this
function finished but before the neutron-server process started.
"""
return str(helpers.get_free_namespace_port(constants.PROTO_NAME_TCP))
return str(net_helpers.get_free_namespace_port(
constants.PROTO_NAME_TCP))
def _generate_api_paste(self):
return c_helpers.find_sample_file('api-paste.ini')

View File

@ -12,29 +12,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import os
import random
import re
import select
import shlex
import subprocess
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.tests import tools
CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20)
CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5)
READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5)
SS_SOURCE_PORT_PATTERN = re.compile(
r'^.*\s+\d+\s+.*:(?P<port>\d+)\s+[0-9:].*')
TRANSPORT_PROTOCOLS = (constants.PROTO_NAME_TCP, constants.PROTO_NAME_UDP)
class RecursivePermDirFixture(tools.SafeFixture):
"""Ensure at least perms permissions on directory and ancestors."""
@ -54,206 +35,3 @@ class RecursivePermDirFixture(tools.SafeFixture):
os.chmod(current_directory, perms | self.least_perms)
previous_directory = current_directory
current_directory = os.path.dirname(current_directory)
def get_free_namespace_port(protocol, namespace=None):
"""Return an unused port from given namespace
WARNING: This function returns a port that is free at the execution time of
this function. If this port is used later for binding then there
is a potential danger that port will be no longer free. It's up to
the programmer to handle error if port is already in use.
:param protocol: Return free port for given protocol. Supported protocols
are 'tcp' and 'udp'.
"""
if protocol == constants.PROTO_NAME_TCP:
param = '-tna'
elif protocol == constants.PROTO_NAME_UDP:
param = '-una'
else:
raise ValueError("Unsupported procotol %s" % protocol)
ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
output = ip_wrapper.netns.execute(['ss', param])
used_ports = _get_source_ports_from_ss_output(output)
return get_unused_port(used_ports)
def _get_source_ports_from_ss_output(output):
ports = set()
for line in output.splitlines():
match = SS_SOURCE_PORT_PATTERN.match(line)
if match:
ports.add(match.group('port'))
return ports
def get_unused_port(used, start=1024, end=65535):
candidates = set(range(start, end + 1))
return random.choice(list(candidates - used))
class RootHelperProcess(subprocess.Popen):
def __init__(self, cmd, *args, **kwargs):
for arg in ('stdin', 'stdout', 'stderr'):
kwargs.setdefault(arg, subprocess.PIPE)
self.namespace = kwargs.pop('namespace', None)
self.cmd = cmd
if self.namespace is not None:
cmd = ['ip', 'netns', 'exec', self.namespace] + cmd
root_helper = config.get_root_helper(utils.cfg.CONF)
cmd = shlex.split(root_helper) + cmd
self.child_pid = None
super(RootHelperProcess, self).__init__(cmd, *args, **kwargs)
self._wait_for_child_process()
def kill(self):
pid = self.child_pid or str(self.pid)
utils.execute(['kill', '-9', pid], run_as_root=True)
def read_stdout(self, timeout=None):
return self._read_stream(self.stdout, timeout)
@staticmethod
def _read_stream(stream, timeout):
if timeout:
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
utils.wait_until_true(poll_predicate, timeout, 0.1,
RuntimeError(
'No output in %.2f seconds' % timeout))
return stream.readline()
def writeline(self, data):
self.stdin.write(data + os.linesep)
self.stdin.flush()
def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT,
sleep=CHILD_PROCESS_SLEEP):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
utils.wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned "
"in %d seconds" % (self.cmd, timeout)))
self.child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
class NetcatTester(object):
TESTING_STRING = 'foo'
TCP = constants.PROTO_NAME_TCP
UDP = constants.PROTO_NAME_UDP
def __init__(self, client_namespace, server_namespace, address,
dst_port, protocol, server_address='0.0.0.0', src_port=None):
"""
Tool for testing connectivity on transport layer using netcat
executable.
The processes are spawned lazily.
:param client_namespace: Namespace in which netcat process that
connects to other netcat will be spawned
:param server_namespace: Namespace in which listening netcat process
will be spawned
:param address: Server address from client point of view
:param dst_port: Port on which netcat listens
:param protocol: Transport protocol, either 'tcp' or 'udp'
:param server_address: Address in server namespace on which netcat
should listen
:param src_port: Source port of netcat process spawned in client
namespace - packet will have src_port in TCP/UDP
header with this value
"""
self.client_namespace = client_namespace
self.server_namespace = server_namespace
self._client_process = None
self._server_process = None
self.address = address
self.server_address = server_address
self.dst_port = str(dst_port)
self.src_port = str(src_port) if src_port else None
if protocol not in TRANSPORT_PROTOCOLS:
raise ValueError("Unsupported protocol %s" % protocol)
self.protocol = protocol
@property
def client_process(self):
if not self._client_process:
self.establish_connection()
return self._client_process
@property
def server_process(self):
if not self._server_process:
self._spawn_server_process()
return self._server_process
def _spawn_server_process(self):
self._server_process = self._spawn_nc_in_namespace(
self.server_namespace,
address=self.server_address,
listen=True)
def establish_connection(self):
if self._client_process:
raise RuntimeError('%(proto)s connection to $(ip_addr)s is already'
' established' %
{'proto': self.protocol,
'ip_addr': self.address})
if not self._server_process:
self._spawn_server_process()
self._client_process = self._spawn_nc_in_namespace(
self.client_namespace,
address=self.address)
if self.protocol == self.UDP:
# Create an entry in conntrack table for UDP packets
self.client_process.writeline(self.TESTING_STRING)
def test_connectivity(self, respawn=False):
stop_required = (respawn and self._client_process and
self._client_process.poll() is not None)
if stop_required:
self.stop_processes()
self.client_process.writeline(self.TESTING_STRING)
message = self.server_process.read_stdout(READ_TIMEOUT).strip()
self.server_process.writeline(message)
message = self.client_process.read_stdout(READ_TIMEOUT).strip()
return message == self.TESTING_STRING
def _spawn_nc_in_namespace(self, namespace, address, listen=False):
cmd = ['nc', address, self.dst_port]
if self.protocol == self.UDP:
cmd.append('-u')
if listen:
cmd.append('-l')
if self.protocol == self.TCP:
cmd.append('-k')
else:
cmd.extend(['-w', '20'])
if self.src_port:
cmd.extend(['-p', self.src_port])
proc = RootHelperProcess(cmd, namespace=namespace)
return proc
def stop_processes(self):
for proc_attr in ('_client_process', '_server_process'):
proc = getattr(self, proc_attr)
if proc:
if proc.poll() is None:
proc.kill()
proc.wait()
setattr(self, proc_attr, None)

View File

@ -1,34 +0,0 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base
class TestRootHelperProcess(base.BaseSudoTestCase):
def test_process_read_write(self):
proc = helpers.RootHelperProcess(['tee'])
proc.writeline('foo')
output = proc.read_stdout(helpers.READ_TIMEOUT)
self.assertEqual('foo\n', output)
def test_process_kill(self):
with self.assert_max_execution_time(100):
proc = helpers.RootHelperProcess(['tee'])
proc.kill()
proc.wait()
# sudo returns 137 and
# rootwrap returns 247 (bug 1364822)
self.assertIn(proc.returncode, [137, 247])

View File

@ -24,7 +24,6 @@ from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base as linux_base
from neutron.tests.functional.agent.linux.bin import ipt_binname
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
@ -44,8 +43,8 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
self.client_fw, self.server_fw = self.create_firewalls()
# The port is used in isolated namespace that precludes possibility of
# port conflicts
self.port = helpers.get_free_namespace_port(constants.PROTO_NAME_TCP,
self.server.namespace)
self.port = net_helpers.get_free_namespace_port(
constants.PROTO_NAME_TCP, self.server.namespace)
def create_firewalls(self):
client_iptables = iptables_manager.IptablesManager(
@ -80,7 +79,7 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
return chain, rule
def _test_with_nc(self, fw_manager, direction, port, protocol):
netcat = helpers.NetcatTester(
netcat = net_helpers.NetcatTester(
self.client.namespace, self.server.namespace,
self.server.ip, self.port, protocol)
self.addCleanup(netcat.stop_processes)
@ -121,35 +120,35 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
def test_tcp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port,
protocol=helpers.NetcatTester.TCP)
protocol=net_helpers.NetcatTester.TCP)
def test_tcp_output_port(self):
self._test_with_nc(self.client_fw, 'egress', self.port,
protocol=helpers.NetcatTester.TCP)
protocol=net_helpers.NetcatTester.TCP)
def test_tcp_input(self):
self._test_with_nc(self.server_fw, 'ingress', port=None,
protocol=helpers.NetcatTester.TCP)
protocol=net_helpers.NetcatTester.TCP)
def test_tcp_output(self):
self._test_with_nc(self.client_fw, 'egress', port=None,
protocol=helpers.NetcatTester.TCP)
protocol=net_helpers.NetcatTester.TCP)
def test_udp_input_port(self):
self._test_with_nc(self.server_fw, 'ingress', self.port,
protocol=helpers.NetcatTester.UDP)
protocol=net_helpers.NetcatTester.UDP)
def test_udp_output_port(self):
self._test_with_nc(self.client_fw, 'egress', self.port,
protocol=helpers.NetcatTester.UDP)
protocol=net_helpers.NetcatTester.UDP)
def test_udp_input(self):
self._test_with_nc(self.server_fw, 'ingress', port=None,
protocol=helpers.NetcatTester.UDP)
protocol=net_helpers.NetcatTester.UDP)
def test_udp_output(self):
self._test_with_nc(self.client_fw, 'egress', port=None,
protocol=helpers.NetcatTester.UDP)
protocol=net_helpers.NetcatTester.UDP)
class IptablesManagerNonRootTestCase(base.BaseTestCase):

View File

@ -400,8 +400,8 @@ class L3AgentTestCase(L3AgentTestFramework):
router_info = self.generate_router_info(enable_ha=False)
router = self.manage_router(self.agent, router_info)
port = helpers.get_free_namespace_port(l3_constants.PROTO_NAME_TCP,
router.ns_name)
port = net_helpers.get_free_namespace_port(l3_constants.PROTO_NAME_TCP,
router.ns_name)
client_address = '19.4.4.3'
server_address = '35.4.0.4'
@ -413,9 +413,9 @@ class L3AgentTestCase(L3AgentTestFramework):
router.process(self.agent)
router_ns = ip_lib.IPWrapper(namespace=router.ns_name)
netcat = helpers.NetcatTester(router.ns_name, router.ns_name,
client_address, port,
protocol=helpers.NetcatTester.TCP)
netcat = net_helpers.NetcatTester(
router.ns_name, router.ns_name, client_address, port,
protocol=net_helpers.NetcatTester.TCP)
self.addCleanup(netcat.stop_processes)
def assert_num_of_conntrack_rules(n):
@ -705,13 +705,13 @@ class L3AgentTestCase(L3AgentTestFramework):
self._add_fip(router, dst_fip, fixed_address=dst_machine.ip)
router.process(self.agent)
protocol_port = helpers.get_free_namespace_port(
protocol_port = net_helpers.get_free_namespace_port(
l3_constants.PROTO_NAME_TCP, dst_machine.namespace)
# client sends to fip
netcat = helpers.NetcatTester(
netcat = net_helpers.NetcatTester(
src_machine.namespace, dst_machine.namespace,
dst_fip, protocol_port,
protocol=helpers.NetcatTester.TCP)
protocol=net_helpers.NetcatTester.TCP)
self.addCleanup(netcat.stop_processes)
self.assertTrue(netcat.test_connectivity())