Merge "Add source interface in "ping" command in `ARPSpoofTestCase`"

This commit is contained in:
Zuul 2024-09-13 14:22:01 +00:00 committed by Gerrit Code Review
commit e52d029911
2 changed files with 24 additions and 17 deletions

View File

@ -109,16 +109,19 @@ def set_namespace_gateway(port_dev, gateway_ip):
port_dev.route.add_gateway(gateway_ip)
def assert_ping(src_namespace, dst_ip, timeout=1, count=3, retry_count=1):
def assert_ping(src_namespace, dst_ip, timeout=1, count=3, retry_count=1,
device=None):
ipversion = netaddr.IPAddress(dst_ip).version
ping_command = 'ping' if ipversion == 4 else 'ping6'
ns_ip_wrapper = ip_lib.IPWrapper(src_namespace)
cmd = [ping_command, '-W', timeout, '-c', count]
if device:
cmd += ['-I', device]
cmd.append(dst_ip)
while retry_count:
retry_count -= 1
try:
ns_ip_wrapper.netns.execute(
[ping_command, '-W', timeout, '-c', count, dst_ip],
privsep_exec=True)
ns_ip_wrapper.netns.execute(cmd, privsep_exec=True)
return
except n_exc.ProcessExecutionError as exc:
if not retry_count:
@ -156,9 +159,9 @@ def async_ping(namespace, ips, timeout=1, count=10):
f.result()
def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1, device=None):
try:
assert_ping(src_namespace, dst_ip, timeout, count)
assert_ping(src_namespace, dst_ip, timeout, count, device=device)
except RuntimeError:
pass
else:

View File

@ -127,7 +127,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def test_mac_spoof_blocks_wrong_mac(self):
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr])
@ -135,11 +135,12 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
# changing the allowed mac should stop the port from working
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr],
mac='00:11:22:33:44:55')
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr,
device=self.src_addr)
def test_arp_spoof_doesnt_block_ipv6(self):
self.src_addr = '2000::1'
@ -152,7 +153,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.wait_until_address_ready(self.src_addr)
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def test_arp_spoof_blocks_response(self):
# this will prevent the destination from responding to the ARP
@ -160,7 +161,8 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2,
device=self.src_addr)
def test_arp_spoof_blocks_icmpv6_neigh_advt(self):
self.src_addr = '2000::1'
@ -175,7 +177,8 @@ class ARPSpoofTestCase(OVSAgentTestBase):
# make sure the IPv6 addresses are ready before pinging
self.src_p.addr.wait_until_address_ready(self.src_addr)
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2,
device=self.src_addr)
def test_arp_spoof_blocks_request(self):
# this will prevent the source from sending an ARP
@ -199,7 +202,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def test_arp_spoof_icmpv6_neigh_advt_allowed_address_pairs(self):
self.src_addr = '2000::1'
@ -212,14 +215,15 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.wait_until_address_ready(self.src_addr)
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def test_arp_spoof_allowed_address_pairs_0cidr(self):
self._setup_arp_spoof_for_port(self.dst_p.name, ['9.9.9.9/0',
'1.2.3.4'])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
device=self.src_addr)
def test_arp_spoof_disable_port_security(self):
# block first and then disable port security to make sure old rules
@ -230,7 +234,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def test_arp_spoof_disable_network_port(self):
# block first and then disable port security to make sure old rules
@ -242,7 +246,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr,
retry_count=2)
retry_count=2, device=self.src_addr)
def _setup_arp_spoof_for_port(self, port, addrs, psec=True,
device_owner='nobody', mac=None):