diff --git a/neutron/tests/functional/agent/linux/bin/__init__.py b/neutron/tests/functional/agent/linux/bin/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/functional/agent/linux/bin/ipt_binname.py b/neutron/tests/functional/agent/linux/bin/ipt_binname.py new file mode 100755 index 00000000000..79bd9be0a57 --- /dev/null +++ b/neutron/tests/functional/agent/linux/bin/ipt_binname.py @@ -0,0 +1,37 @@ +#! /usr/bin/env python + +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# Copyright (C) 2014 YAMAMOTO Takashi +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function +import sys + +import eventlet + + +def print_binary_name(): + # NOTE(yamamoto): Don't move this import to module-level. + # The aim is to test importing from eventlet non-main thread. + # See Bug #1367075 for details. + from neutron.agent.linux import iptables_manager + + print(iptables_manager.binary_name) + +if __name__ == "__main__": + if 'spawn' in sys.argv: + eventlet.spawn(print_binary_name).wait() + else: + print_binary_name() diff --git a/neutron/tests/functional/agent/linux/test_iptables.py b/neutron/tests/functional/agent/linux/test_iptables.py index eb9c828a5f5..e3805897127 100644 --- a/neutron/tests/functional/agent/linux/test_iptables.py +++ b/neutron/tests/functional/agent/linux/test_iptables.py @@ -12,14 +12,19 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import os.path + import testtools from neutron.agent.linux import iptables_manager -from neutron.tests.functional.agent.linux import base +from neutron.agent.linux import utils +from neutron.tests import base +from neutron.tests.functional.agent.linux import base as linux_base +from neutron.tests.functional.agent.linux.bin import ipt_binname from neutron.tests.functional.agent.linux import helpers -class IptablesManagerTestCase(base.BaseIPVethTestCase): +class IptablesManagerTestCase(linux_base.BaseIPVethTestCase): DIRECTION_CHAIN_MAPPER = {'ingress': 'INPUT', 'egress': 'OUTPUT'} PROTOCOL_BLOCK_RULE = '-p %s -j DROP' @@ -85,25 +90,28 @@ class IptablesManagerTestCase(base.BaseIPVethTestCase): def test_icmp(self): pinger = helpers.Pinger(self.client_ns) pinger.assert_ping(self.DST_ADDRESS) - self.server_fw.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE) + self.server_fw.ipv4['filter'].add_rule('INPUT', + linux_base.ICMP_BLOCK_RULE) self.server_fw.apply() pinger.assert_no_ping(self.DST_ADDRESS) self.server_fw.ipv4['filter'].remove_rule('INPUT', - base.ICMP_BLOCK_RULE) + linux_base.ICMP_BLOCK_RULE) self.server_fw.apply() pinger.assert_ping(self.DST_ADDRESS) def test_mangle_icmp(self): pinger = helpers.Pinger(self.client_ns) pinger.assert_ping(self.DST_ADDRESS) - self.server_fw.ipv4['mangle'].add_rule('INPUT', base.ICMP_MARK_RULE) - self.server_fw.ipv4['filter'].add_rule('INPUT', base.MARKED_BLOCK_RULE) + self.server_fw.ipv4['mangle'].add_rule('INPUT', + linux_base.ICMP_MARK_RULE) + self.server_fw.ipv4['filter'].add_rule('INPUT', + linux_base.MARKED_BLOCK_RULE) self.server_fw.apply() pinger.assert_no_ping(self.DST_ADDRESS) self.server_fw.ipv4['mangle'].remove_rule('INPUT', - base.ICMP_MARK_RULE) + linux_base.ICMP_MARK_RULE) self.server_fw.ipv4['filter'].remove_rule('INPUT', - base.MARKED_BLOCK_RULE) + linux_base.MARKED_BLOCK_RULE) self.server_fw.apply() pinger.assert_ping(self.DST_ADDRESS) @@ -130,3 +138,24 @@ class IptablesManagerTestCase(base.BaseIPVethTestCase): def test_udp_output(self): self._test_with_nc(self.client_fw, 'egress', port=None, udp=True) + + +class IptablesManagerNonRootTestCase(base.BaseTestCase): + @staticmethod + def _normalize_module_name(name): + for suf in ['.pyc', '.pyo']: + if name.endswith(suf): + return name[:-len(suf)] + '.py' + return name + + def _test_binary_name(self, module, *extra_options): + executable = self._normalize_module_name(module.__file__) + expected = os.path.basename(executable)[:16] + observed = utils.execute([executable] + list(extra_options)).rstrip() + self.assertEqual(expected, observed) + + def test_binary_name(self): + self._test_binary_name(ipt_binname) + + def test_binary_name_eventlet_spawn(self): + self._test_binary_name(ipt_binname, 'spawn')