From b9e9cfb08bf0609dcfea46403c510607e858926a Mon Sep 17 00:00:00 2001 From: Jakub Libosvar Date: Wed, 17 Jun 2015 13:10:13 +0000 Subject: [PATCH] Move NetcatTester to common/net_helpers The NetcatTester is a testing tool that can be used also in fullstack tests so I think it should go there to avoid imports in fullstack tests from functional. Tests for original helpers module was removed. Change-Id: I7229eba1dbc2ca3d524a1a021256b6202f4aecee --- neutron/tests/common/net_helpers.py | 222 ++++++++++++++++++ neutron/tests/fullstack/config_fixtures.py | 5 +- .../tests/functional/agent/linux/helpers.py | 222 ------------------ .../functional/agent/linux/test_helpers.py | 34 --- .../functional/agent/linux/test_iptables.py | 23 +- .../tests/functional/agent/test_l3_agent.py | 16 +- 6 files changed, 244 insertions(+), 278 deletions(-) delete mode 100644 neutron/tests/functional/agent/linux/test_helpers.py diff --git a/neutron/tests/common/net_helpers.py b/neutron/tests/common/net_helpers.py index 2c5bb94b5a0..d0e03cb5feb 100644 --- a/neutron/tests/common/net_helpers.py +++ b/neutron/tests/common/net_helpers.py @@ -14,14 +14,23 @@ # import abc +import functools +import os +import random +import re +import select +import shlex +import subprocess import netaddr from oslo_utils import uuidutils import six +from neutron.agent.common import config from neutron.agent.common import ovs_lib from neutron.agent.linux import bridge_lib from neutron.agent.linux import ip_lib +from neutron.agent.linux import utils from neutron.common import constants as n_const from neutron.tests import base as tests_base from neutron.tests.common import base as common_base @@ -33,6 +42,16 @@ PORT_PREFIX = 'test-port' VETH0_PREFIX = 'test-veth0' VETH1_PREFIX = 'test-veth1' +SS_SOURCE_PORT_PATTERN = re.compile( + r'^.*\s+\d+\s+.*:(?P\d+)\s+[0-9:].*') + +READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5) + +CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20) +CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5) + +TRANSPORT_PROTOCOLS = (n_const.PROTO_NAME_TCP, n_const.PROTO_NAME_UDP) + def get_rand_port_name(): return tests_base.get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN, @@ -105,6 +124,209 @@ def assert_no_arping(src_namespace, dst_ip, source=None, timeout=1, count=1): {'ns': src_namespace, 'destination': dst_ip}) +def _get_source_ports_from_ss_output(output): + ports = set() + for line in output.splitlines(): + match = SS_SOURCE_PORT_PATTERN.match(line) + if match: + ports.add(match.group('port')) + return ports + + +def get_unused_port(used, start=1024, end=65535): + candidates = set(range(start, end + 1)) + return random.choice(list(candidates - used)) + + +def get_free_namespace_port(protocol, namespace=None): + """Return an unused port from given namespace + + WARNING: This function returns a port that is free at the execution time of + this function. If this port is used later for binding then there + is a potential danger that port will be no longer free. It's up to + the programmer to handle error if port is already in use. + + :param protocol: Return free port for given protocol. Supported protocols + are 'tcp' and 'udp'. + """ + if protocol == n_const.PROTO_NAME_TCP: + param = '-tna' + elif protocol == n_const.PROTO_NAME_UDP: + param = '-una' + else: + raise ValueError("Unsupported procotol %s" % protocol) + + ip_wrapper = ip_lib.IPWrapper(namespace=namespace) + output = ip_wrapper.netns.execute(['ss', param]) + used_ports = _get_source_ports_from_ss_output(output) + + return get_unused_port(used_ports) + + +class RootHelperProcess(subprocess.Popen): + def __init__(self, cmd, *args, **kwargs): + for arg in ('stdin', 'stdout', 'stderr'): + kwargs.setdefault(arg, subprocess.PIPE) + self.namespace = kwargs.pop('namespace', None) + self.cmd = cmd + if self.namespace is not None: + cmd = ['ip', 'netns', 'exec', self.namespace] + cmd + root_helper = config.get_root_helper(utils.cfg.CONF) + cmd = shlex.split(root_helper) + cmd + self.child_pid = None + super(RootHelperProcess, self).__init__(cmd, *args, **kwargs) + self._wait_for_child_process() + + def kill(self): + pid = self.child_pid or str(self.pid) + utils.execute(['kill', '-9', pid], run_as_root=True) + + def read_stdout(self, timeout=None): + return self._read_stream(self.stdout, timeout) + + @staticmethod + def _read_stream(stream, timeout): + if timeout: + poller = select.poll() + poller.register(stream.fileno()) + poll_predicate = functools.partial(poller.poll, 1) + utils.wait_until_true(poll_predicate, timeout, 0.1, + RuntimeError( + 'No output in %.2f seconds' % timeout)) + return stream.readline() + + def writeline(self, data): + self.stdin.write(data + os.linesep) + self.stdin.flush() + + def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT, + sleep=CHILD_PROCESS_SLEEP): + def child_is_running(): + child_pid = utils.get_root_helper_child_pid( + self.pid, run_as_root=True) + if utils.pid_invoked_with_cmdline(child_pid, self.cmd): + return True + + utils.wait_until_true( + child_is_running, + timeout, + exception=RuntimeError("Process %s hasn't been spawned " + "in %d seconds" % (self.cmd, timeout))) + self.child_pid = utils.get_root_helper_child_pid( + self.pid, run_as_root=True) + + +class NetcatTester(object): + TESTING_STRING = 'foo' + TCP = n_const.PROTO_NAME_TCP + UDP = n_const.PROTO_NAME_UDP + + def __init__(self, client_namespace, server_namespace, address, + dst_port, protocol, server_address='0.0.0.0', src_port=None): + """ + Tool for testing connectivity on transport layer using netcat + executable. + + The processes are spawned lazily. + + :param client_namespace: Namespace in which netcat process that + connects to other netcat will be spawned + :param server_namespace: Namespace in which listening netcat process + will be spawned + :param address: Server address from client point of view + :param dst_port: Port on which netcat listens + :param protocol: Transport protocol, either 'tcp' or 'udp' + :param server_address: Address in server namespace on which netcat + should listen + :param src_port: Source port of netcat process spawned in client + namespace - packet will have src_port in TCP/UDP + header with this value + + """ + self.client_namespace = client_namespace + self.server_namespace = server_namespace + self._client_process = None + self._server_process = None + self.address = address + self.server_address = server_address + self.dst_port = str(dst_port) + self.src_port = str(src_port) if src_port else None + if protocol not in TRANSPORT_PROTOCOLS: + raise ValueError("Unsupported protocol %s" % protocol) + self.protocol = protocol + + @property + def client_process(self): + if not self._client_process: + self.establish_connection() + return self._client_process + + @property + def server_process(self): + if not self._server_process: + self._spawn_server_process() + return self._server_process + + def _spawn_server_process(self): + self._server_process = self._spawn_nc_in_namespace( + self.server_namespace, + address=self.server_address, + listen=True) + + def establish_connection(self): + if self._client_process: + raise RuntimeError('%(proto)s connection to $(ip_addr)s is already' + ' established' % + {'proto': self.protocol, + 'ip_addr': self.address}) + + if not self._server_process: + self._spawn_server_process() + self._client_process = self._spawn_nc_in_namespace( + self.client_namespace, + address=self.address) + if self.protocol == self.UDP: + # Create an entry in conntrack table for UDP packets + self.client_process.writeline(self.TESTING_STRING) + + def test_connectivity(self, respawn=False): + stop_required = (respawn and self._client_process and + self._client_process.poll() is not None) + if stop_required: + self.stop_processes() + + self.client_process.writeline(self.TESTING_STRING) + message = self.server_process.read_stdout(READ_TIMEOUT).strip() + self.server_process.writeline(message) + message = self.client_process.read_stdout(READ_TIMEOUT).strip() + + return message == self.TESTING_STRING + + def _spawn_nc_in_namespace(self, namespace, address, listen=False): + cmd = ['nc', address, self.dst_port] + if self.protocol == self.UDP: + cmd.append('-u') + if listen: + cmd.append('-l') + if self.protocol == self.TCP: + cmd.append('-k') + else: + cmd.extend(['-w', '20']) + if self.src_port: + cmd.extend(['-p', self.src_port]) + proc = RootHelperProcess(cmd, namespace=namespace) + return proc + + def stop_processes(self): + for proc_attr in ('_client_process', '_server_process'): + proc = getattr(self, proc_attr) + if proc: + if proc.poll() is None: + proc.kill() + proc.wait() + setattr(self, proc_attr, None) + + class NamespaceFixture(tools.SafeFixture): """Create a namespace. diff --git a/neutron/tests/fullstack/config_fixtures.py b/neutron/tests/fullstack/config_fixtures.py index ec1248d1b05..f07993cfaa2 100644 --- a/neutron/tests/fullstack/config_fixtures.py +++ b/neutron/tests/fullstack/config_fixtures.py @@ -20,7 +20,7 @@ import six from neutron.common import constants from neutron.tests import base from neutron.tests.common import helpers as c_helpers -from neutron.tests.functional.agent.linux import helpers +from neutron.tests.common import net_helpers from neutron.tests import tools @@ -140,7 +140,8 @@ class NeutronConfigFixture(ConfigFixture): This might fail if some other process occupies this port after this function finished but before the neutron-server process started. """ - return str(helpers.get_free_namespace_port(constants.PROTO_NAME_TCP)) + return str(net_helpers.get_free_namespace_port( + constants.PROTO_NAME_TCP)) def _generate_api_paste(self): return c_helpers.find_sample_file('api-paste.ini') diff --git a/neutron/tests/functional/agent/linux/helpers.py b/neutron/tests/functional/agent/linux/helpers.py index 92ac9c92bbd..f7dc76099e1 100644 --- a/neutron/tests/functional/agent/linux/helpers.py +++ b/neutron/tests/functional/agent/linux/helpers.py @@ -12,29 +12,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import functools import os -import random -import re -import select -import shlex -import subprocess -from neutron.agent.common import config -from neutron.agent.linux import ip_lib -from neutron.agent.linux import utils -from neutron.common import constants from neutron.tests import tools -CHILD_PROCESS_TIMEOUT = os.environ.get('OS_TEST_CHILD_PROCESS_TIMEOUT', 20) -CHILD_PROCESS_SLEEP = os.environ.get('OS_TEST_CHILD_PROCESS_SLEEP', 0.5) -READ_TIMEOUT = os.environ.get('OS_TEST_READ_TIMEOUT', 5) - -SS_SOURCE_PORT_PATTERN = re.compile( - r'^.*\s+\d+\s+.*:(?P\d+)\s+[0-9:].*') - -TRANSPORT_PROTOCOLS = (constants.PROTO_NAME_TCP, constants.PROTO_NAME_UDP) - class RecursivePermDirFixture(tools.SafeFixture): """Ensure at least perms permissions on directory and ancestors.""" @@ -54,206 +35,3 @@ class RecursivePermDirFixture(tools.SafeFixture): os.chmod(current_directory, perms | self.least_perms) previous_directory = current_directory current_directory = os.path.dirname(current_directory) - - -def get_free_namespace_port(protocol, namespace=None): - """Return an unused port from given namespace - - WARNING: This function returns a port that is free at the execution time of - this function. If this port is used later for binding then there - is a potential danger that port will be no longer free. It's up to - the programmer to handle error if port is already in use. - - :param protocol: Return free port for given protocol. Supported protocols - are 'tcp' and 'udp'. - """ - if protocol == constants.PROTO_NAME_TCP: - param = '-tna' - elif protocol == constants.PROTO_NAME_UDP: - param = '-una' - else: - raise ValueError("Unsupported procotol %s" % protocol) - - ip_wrapper = ip_lib.IPWrapper(namespace=namespace) - output = ip_wrapper.netns.execute(['ss', param]) - used_ports = _get_source_ports_from_ss_output(output) - - return get_unused_port(used_ports) - - -def _get_source_ports_from_ss_output(output): - ports = set() - for line in output.splitlines(): - match = SS_SOURCE_PORT_PATTERN.match(line) - if match: - ports.add(match.group('port')) - return ports - - -def get_unused_port(used, start=1024, end=65535): - candidates = set(range(start, end + 1)) - return random.choice(list(candidates - used)) - - -class RootHelperProcess(subprocess.Popen): - def __init__(self, cmd, *args, **kwargs): - for arg in ('stdin', 'stdout', 'stderr'): - kwargs.setdefault(arg, subprocess.PIPE) - self.namespace = kwargs.pop('namespace', None) - self.cmd = cmd - if self.namespace is not None: - cmd = ['ip', 'netns', 'exec', self.namespace] + cmd - root_helper = config.get_root_helper(utils.cfg.CONF) - cmd = shlex.split(root_helper) + cmd - self.child_pid = None - super(RootHelperProcess, self).__init__(cmd, *args, **kwargs) - self._wait_for_child_process() - - def kill(self): - pid = self.child_pid or str(self.pid) - utils.execute(['kill', '-9', pid], run_as_root=True) - - def read_stdout(self, timeout=None): - return self._read_stream(self.stdout, timeout) - - @staticmethod - def _read_stream(stream, timeout): - if timeout: - poller = select.poll() - poller.register(stream.fileno()) - poll_predicate = functools.partial(poller.poll, 1) - utils.wait_until_true(poll_predicate, timeout, 0.1, - RuntimeError( - 'No output in %.2f seconds' % timeout)) - return stream.readline() - - def writeline(self, data): - self.stdin.write(data + os.linesep) - self.stdin.flush() - - def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT, - sleep=CHILD_PROCESS_SLEEP): - def child_is_running(): - child_pid = utils.get_root_helper_child_pid( - self.pid, run_as_root=True) - if utils.pid_invoked_with_cmdline(child_pid, self.cmd): - return True - - utils.wait_until_true( - child_is_running, - timeout, - exception=RuntimeError("Process %s hasn't been spawned " - "in %d seconds" % (self.cmd, timeout))) - self.child_pid = utils.get_root_helper_child_pid( - self.pid, run_as_root=True) - - -class NetcatTester(object): - TESTING_STRING = 'foo' - TCP = constants.PROTO_NAME_TCP - UDP = constants.PROTO_NAME_UDP - - def __init__(self, client_namespace, server_namespace, address, - dst_port, protocol, server_address='0.0.0.0', src_port=None): - """ - Tool for testing connectivity on transport layer using netcat - executable. - - The processes are spawned lazily. - - :param client_namespace: Namespace in which netcat process that - connects to other netcat will be spawned - :param server_namespace: Namespace in which listening netcat process - will be spawned - :param address: Server address from client point of view - :param dst_port: Port on which netcat listens - :param protocol: Transport protocol, either 'tcp' or 'udp' - :param server_address: Address in server namespace on which netcat - should listen - :param src_port: Source port of netcat process spawned in client - namespace - packet will have src_port in TCP/UDP - header with this value - - """ - self.client_namespace = client_namespace - self.server_namespace = server_namespace - self._client_process = None - self._server_process = None - self.address = address - self.server_address = server_address - self.dst_port = str(dst_port) - self.src_port = str(src_port) if src_port else None - if protocol not in TRANSPORT_PROTOCOLS: - raise ValueError("Unsupported protocol %s" % protocol) - self.protocol = protocol - - @property - def client_process(self): - if not self._client_process: - self.establish_connection() - return self._client_process - - @property - def server_process(self): - if not self._server_process: - self._spawn_server_process() - return self._server_process - - def _spawn_server_process(self): - self._server_process = self._spawn_nc_in_namespace( - self.server_namespace, - address=self.server_address, - listen=True) - - def establish_connection(self): - if self._client_process: - raise RuntimeError('%(proto)s connection to $(ip_addr)s is already' - ' established' % - {'proto': self.protocol, - 'ip_addr': self.address}) - - if not self._server_process: - self._spawn_server_process() - self._client_process = self._spawn_nc_in_namespace( - self.client_namespace, - address=self.address) - if self.protocol == self.UDP: - # Create an entry in conntrack table for UDP packets - self.client_process.writeline(self.TESTING_STRING) - - def test_connectivity(self, respawn=False): - stop_required = (respawn and self._client_process and - self._client_process.poll() is not None) - if stop_required: - self.stop_processes() - - self.client_process.writeline(self.TESTING_STRING) - message = self.server_process.read_stdout(READ_TIMEOUT).strip() - self.server_process.writeline(message) - message = self.client_process.read_stdout(READ_TIMEOUT).strip() - - return message == self.TESTING_STRING - - def _spawn_nc_in_namespace(self, namespace, address, listen=False): - cmd = ['nc', address, self.dst_port] - if self.protocol == self.UDP: - cmd.append('-u') - if listen: - cmd.append('-l') - if self.protocol == self.TCP: - cmd.append('-k') - else: - cmd.extend(['-w', '20']) - if self.src_port: - cmd.extend(['-p', self.src_port]) - proc = RootHelperProcess(cmd, namespace=namespace) - return proc - - def stop_processes(self): - for proc_attr in ('_client_process', '_server_process'): - proc = getattr(self, proc_attr) - if proc: - if proc.poll() is None: - proc.kill() - proc.wait() - setattr(self, proc_attr, None) diff --git a/neutron/tests/functional/agent/linux/test_helpers.py b/neutron/tests/functional/agent/linux/test_helpers.py deleted file mode 100644 index a027245d4c0..00000000000 --- a/neutron/tests/functional/agent/linux/test_helpers.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.tests.functional.agent.linux import helpers -from neutron.tests.functional import base - - -class TestRootHelperProcess(base.BaseSudoTestCase): - - def test_process_read_write(self): - proc = helpers.RootHelperProcess(['tee']) - proc.writeline('foo') - output = proc.read_stdout(helpers.READ_TIMEOUT) - self.assertEqual('foo\n', output) - - def test_process_kill(self): - with self.assert_max_execution_time(100): - proc = helpers.RootHelperProcess(['tee']) - proc.kill() - proc.wait() - # sudo returns 137 and - # rootwrap returns 247 (bug 1364822) - self.assertIn(proc.returncode, [137, 247]) diff --git a/neutron/tests/functional/agent/linux/test_iptables.py b/neutron/tests/functional/agent/linux/test_iptables.py index 3d78459aee0..2130ec8ccd4 100644 --- a/neutron/tests/functional/agent/linux/test_iptables.py +++ b/neutron/tests/functional/agent/linux/test_iptables.py @@ -24,7 +24,6 @@ from neutron.tests.common import machine_fixtures from neutron.tests.common import net_helpers from neutron.tests.functional.agent.linux import base as linux_base from neutron.tests.functional.agent.linux.bin import ipt_binname -from neutron.tests.functional.agent.linux import helpers from neutron.tests.functional import base as functional_base @@ -44,8 +43,8 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase): self.client_fw, self.server_fw = self.create_firewalls() # The port is used in isolated namespace that precludes possibility of # port conflicts - self.port = helpers.get_free_namespace_port(constants.PROTO_NAME_TCP, - self.server.namespace) + self.port = net_helpers.get_free_namespace_port( + constants.PROTO_NAME_TCP, self.server.namespace) def create_firewalls(self): client_iptables = iptables_manager.IptablesManager( @@ -80,7 +79,7 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase): return chain, rule def _test_with_nc(self, fw_manager, direction, port, protocol): - netcat = helpers.NetcatTester( + netcat = net_helpers.NetcatTester( self.client.namespace, self.server.namespace, self.server.ip, self.port, protocol) self.addCleanup(netcat.stop_processes) @@ -121,35 +120,35 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase): def test_tcp_input_port(self): self._test_with_nc(self.server_fw, 'ingress', self.port, - protocol=helpers.NetcatTester.TCP) + protocol=net_helpers.NetcatTester.TCP) def test_tcp_output_port(self): self._test_with_nc(self.client_fw, 'egress', self.port, - protocol=helpers.NetcatTester.TCP) + protocol=net_helpers.NetcatTester.TCP) def test_tcp_input(self): self._test_with_nc(self.server_fw, 'ingress', port=None, - protocol=helpers.NetcatTester.TCP) + protocol=net_helpers.NetcatTester.TCP) def test_tcp_output(self): self._test_with_nc(self.client_fw, 'egress', port=None, - protocol=helpers.NetcatTester.TCP) + protocol=net_helpers.NetcatTester.TCP) def test_udp_input_port(self): self._test_with_nc(self.server_fw, 'ingress', self.port, - protocol=helpers.NetcatTester.UDP) + protocol=net_helpers.NetcatTester.UDP) def test_udp_output_port(self): self._test_with_nc(self.client_fw, 'egress', self.port, - protocol=helpers.NetcatTester.UDP) + protocol=net_helpers.NetcatTester.UDP) def test_udp_input(self): self._test_with_nc(self.server_fw, 'ingress', port=None, - protocol=helpers.NetcatTester.UDP) + protocol=net_helpers.NetcatTester.UDP) def test_udp_output(self): self._test_with_nc(self.client_fw, 'egress', port=None, - protocol=helpers.NetcatTester.UDP) + protocol=net_helpers.NetcatTester.UDP) class IptablesManagerNonRootTestCase(base.BaseTestCase): diff --git a/neutron/tests/functional/agent/test_l3_agent.py b/neutron/tests/functional/agent/test_l3_agent.py index b35fb074a22..00412968325 100644 --- a/neutron/tests/functional/agent/test_l3_agent.py +++ b/neutron/tests/functional/agent/test_l3_agent.py @@ -400,8 +400,8 @@ class L3AgentTestCase(L3AgentTestFramework): router_info = self.generate_router_info(enable_ha=False) router = self.manage_router(self.agent, router_info) - port = helpers.get_free_namespace_port(l3_constants.PROTO_NAME_TCP, - router.ns_name) + port = net_helpers.get_free_namespace_port(l3_constants.PROTO_NAME_TCP, + router.ns_name) client_address = '19.4.4.3' server_address = '35.4.0.4' @@ -413,9 +413,9 @@ class L3AgentTestCase(L3AgentTestFramework): router.process(self.agent) router_ns = ip_lib.IPWrapper(namespace=router.ns_name) - netcat = helpers.NetcatTester(router.ns_name, router.ns_name, - client_address, port, - protocol=helpers.NetcatTester.TCP) + netcat = net_helpers.NetcatTester( + router.ns_name, router.ns_name, client_address, port, + protocol=net_helpers.NetcatTester.TCP) self.addCleanup(netcat.stop_processes) def assert_num_of_conntrack_rules(n): @@ -705,13 +705,13 @@ class L3AgentTestCase(L3AgentTestFramework): self._add_fip(router, dst_fip, fixed_address=dst_machine.ip) router.process(self.agent) - protocol_port = helpers.get_free_namespace_port( + protocol_port = net_helpers.get_free_namespace_port( l3_constants.PROTO_NAME_TCP, dst_machine.namespace) # client sends to fip - netcat = helpers.NetcatTester( + netcat = net_helpers.NetcatTester( src_machine.namespace, dst_machine.namespace, dst_fip, protocol_port, - protocol=helpers.NetcatTester.TCP) + protocol=net_helpers.NetcatTester.TCP) self.addCleanup(netcat.stop_processes) self.assertTrue(netcat.test_connectivity())