Break Pinger class to functions
As the class served only for storing parameters that can be passed as actual function parameters, there is no reason for class. Change-Id: I553b4d6daeb78d495cda09894582a3d885b5d1b5
This commit is contained in:
parent
e601f26755
commit
753196480d
|
@ -12,39 +12,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import fixtures
|
||||
import netaddr
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests import tools
|
||||
|
||||
|
||||
class Pinger(object):
|
||||
def __init__(self, namespace, timeout=1, max_attempts=1):
|
||||
self.namespace = namespace
|
||||
self._timeout = timeout
|
||||
self._max_attempts = max_attempts
|
||||
|
||||
def _ping_destination(self, dest_address):
|
||||
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
|
||||
ipversion = netaddr.IPAddress(dest_address).version
|
||||
ping_command = 'ping' if ipversion == 4 else 'ping6'
|
||||
ns_ip_wrapper.netns.execute([ping_command, '-c', self._max_attempts,
|
||||
'-W', self._timeout, dest_address])
|
||||
|
||||
def assert_ping(self, dst_ip):
|
||||
self._ping_destination(dst_ip)
|
||||
|
||||
def assert_no_ping(self, dst_ip):
|
||||
try:
|
||||
self._ping_destination(dst_ip)
|
||||
tools.fail("destination ip %(dst_ip)s is replying to ping "
|
||||
"from namespace %(ns)s, but it shouldn't" %
|
||||
{'ns': self.namespace, 'dst_ip': dst_ip})
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
|
||||
class FakeMachine(fixtures.Fixture):
|
||||
|
@ -89,12 +60,10 @@ class FakeMachine(fixtures.Fixture):
|
|||
return ns_ip_wrapper.netns.execute(*args, **kwargs)
|
||||
|
||||
def assert_ping(self, dst_ip):
|
||||
pinger = Pinger(self.namespace)
|
||||
pinger.assert_ping(dst_ip)
|
||||
net_helpers.assert_ping(self.namespace, dst_ip)
|
||||
|
||||
def assert_no_ping(self, dst_ip):
|
||||
pinger = Pinger(self.namespace)
|
||||
pinger.assert_no_ping(dst_ip)
|
||||
net_helpers.assert_no_ping(self.namespace, dst_ip)
|
||||
|
||||
|
||||
class PeerMachines(fixtures.Fixture):
|
||||
|
|
|
@ -62,6 +62,25 @@ def set_namespace_gateway(port_dev, gateway_ip):
|
|||
port_dev.route.add_gateway(gateway_ip)
|
||||
|
||||
|
||||
def assert_ping(src_namespace, dst_ip, timeout=1, count=1):
|
||||
ipversion = netaddr.IPAddress(dst_ip).version
|
||||
ping_command = 'ping' if ipversion == 4 else 'ping6'
|
||||
ns_ip_wrapper = ip_lib.IPWrapper(src_namespace)
|
||||
ns_ip_wrapper.netns.execute([ping_command, '-c', count, '-W', timeout,
|
||||
dst_ip])
|
||||
|
||||
|
||||
def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
|
||||
try:
|
||||
assert_ping(src_namespace, dst_ip, timeout, count)
|
||||
except RuntimeError:
|
||||
pass
|
||||
else:
|
||||
tools.fail("destination ip %(destination)s is replying to ping from "
|
||||
"namespace %(ns)s, but it shouldn't" %
|
||||
{'ns': src_namespace, 'destination': dst_ip})
|
||||
|
||||
|
||||
class NamespaceFixture(fixtures.Fixture):
|
||||
"""Create a namespace.
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ from neutron.agent.linux import ip_lib
|
|||
from neutron.cmd.sanity import checks
|
||||
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
|
||||
from neutron.plugins.openvswitch.common import constants
|
||||
from neutron.tests.common import machine_fixtures
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent import test_ovs_lib
|
||||
from neutron.tests.functional import base
|
||||
|
@ -98,8 +97,6 @@ class _ARPSpoofTestCase(object):
|
|||
net_helpers.NamespaceFixture()).name
|
||||
self.dst_namespace = self.useFixture(
|
||||
net_helpers.NamespaceFixture()).name
|
||||
self.pinger = machine_fixtures.Pinger(
|
||||
self.src_namespace, max_attempts=2)
|
||||
self.src_p = self.useFixture(
|
||||
net_helpers.OVSPortFixture(self.br, self.src_namespace)).port
|
||||
self.dst_p = self.useFixture(
|
||||
|
@ -112,7 +109,7 @@ class _ARPSpoofTestCase(object):
|
|||
self._setup_arp_spoof_for_port(self.dst_p.name, [self.dst_addr])
|
||||
self.src_p.addr.add('%s/24' % self.src_addr)
|
||||
self.dst_p.addr.add('%s/24' % self.dst_addr)
|
||||
self.pinger.assert_ping(self.dst_addr)
|
||||
net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
|
||||
|
||||
def test_arp_spoof_doesnt_block_ipv6(self):
|
||||
self.src_addr = '2000::1'
|
||||
|
@ -124,7 +121,7 @@ class _ARPSpoofTestCase(object):
|
|||
# make sure the IPv6 addresses are ready before pinging
|
||||
self.src_p.addr.wait_until_address_ready(self.src_addr)
|
||||
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
|
||||
self.pinger.assert_ping(self.dst_addr)
|
||||
net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
|
||||
|
||||
def test_arp_spoof_blocks_response(self):
|
||||
# this will prevent the destination from responding to the ARP
|
||||
|
@ -132,7 +129,7 @@ class _ARPSpoofTestCase(object):
|
|||
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'])
|
||||
self.src_p.addr.add('%s/24' % self.src_addr)
|
||||
self.dst_p.addr.add('%s/24' % self.dst_addr)
|
||||
self.pinger.assert_no_ping(self.dst_addr)
|
||||
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
|
||||
|
||||
def test_arp_spoof_blocks_request(self):
|
||||
# this will prevent the source from sending an ARP
|
||||
|
@ -154,7 +151,7 @@ class _ARPSpoofTestCase(object):
|
|||
self.dst_addr])
|
||||
self.src_p.addr.add('%s/24' % self.src_addr)
|
||||
self.dst_p.addr.add('%s/24' % self.dst_addr)
|
||||
self.pinger.assert_ping(self.dst_addr)
|
||||
net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
|
||||
|
||||
def test_arp_spoof_disable_port_security(self):
|
||||
# block first and then disable port security to make sure old rules
|
||||
|
@ -164,7 +161,7 @@ class _ARPSpoofTestCase(object):
|
|||
psec=False)
|
||||
self.src_p.addr.add('%s/24' % self.src_addr)
|
||||
self.dst_p.addr.add('%s/24' % self.dst_addr)
|
||||
self.pinger.assert_ping(self.dst_addr)
|
||||
net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2)
|
||||
|
||||
def _setup_arp_spoof_for_port(self, port, addrs, psec=True):
|
||||
of_port_map = self.br.get_vif_port_to_ofport_map()
|
||||
|
|
Loading…
Reference in New Issue