Add firewall blink + remote SG functional tests

This tests that firewall still does its purpose even when rules are
being updated. That means there is no short period of time where
security groups are inactive during update.

Part of this patch introduces Pinger class. This object provides
capability of sending ICMP packets asynchronously and after
it's stopped it provides statistics like how many packets were
sent and how many were received. Note the difference between
assert_ping() functions, which are synchronous.

Another testing of remote security groups is also added.

Related-bug: #1461000
Change-Id: I6251ee264396f8dbc9b284758b96e5cdc6ac500b
This commit is contained in:
Jakub Libosvar 2015-06-30 12:46:30 +00:00
parent a94777a005
commit a459950da3
3 changed files with 154 additions and 0 deletions

View File

@ -59,11 +59,14 @@ class ConnectionTester(fixtures.Fixture):
self.ICMP: self._test_icmp_connectivity,
self.ARP: self._test_arp_connectivity}
self._nc_testers = dict()
self._pingers = dict()
self.addCleanup(self.cleanup)
def cleanup(self):
for nc in self._nc_testers.values():
nc.stop_processes()
for pinger in self._pingers.values():
pinger.stop()
@property
def vm_namespace(self):
@ -89,6 +92,14 @@ class ConnectionTester(fixtures.Fixture):
def vm_mac_address(self, mac_address):
self._vm.mac_address = mac_address
@property
def peer_mac_address(self):
return self._peer.port.link.address
@peer_mac_address.setter
def peer_mac_address(self, mac_address):
self._peer.mac_address = mac_address
@property
def peer_namespace(self):
return self._peer.namespace
@ -238,6 +249,32 @@ class ConnectionTester(fixtures.Fixture):
self._nc_testers[nc_key] = nc_tester
return nc_tester
def _get_pinger(self, direction):
try:
pinger = self._pingers[direction]
except KeyError:
src_namespace, dst_address = self._get_namespace_and_address(
direction)
pinger = net_helpers.Pinger(src_namespace, dst_address)
self._pingers[direction] = pinger
return pinger
def start_sending_icmp(self, direction):
pinger = self._get_pinger(direction)
pinger.start()
def stop_sending_icmp(self, direction):
pinger = self._get_pinger(direction)
pinger.stop()
def get_sent_icmp_packets(self, direction):
pinger = self._get_pinger(direction)
return pinger.sent
def get_received_icmp_packets(self, direction):
pinger = self._get_pinger(direction)
return pinger.received
class LinuxBridgeConnectionTester(ConnectionTester):
"""Tester with linux bridge in the middle
@ -261,6 +298,10 @@ class LinuxBridgeConnectionTester(ConnectionTester):
def vm_port_id(self):
return net_helpers.VethFixture.get_peer_name(self._vm.port.name)
@property
def peer_port_id(self):
return net_helpers.VethFixture.get_peer_name(self._peer.port.name)
def flush_arp_tables(self):
self._bridge.neigh.flush(4, 'all')
super(LinuxBridgeConnectionTester, self).flush_arp_tables()

View File

@ -252,6 +252,75 @@ class RootHelperProcess(subprocess.Popen):
self.child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
@property
def is_running(self):
return self.poll() is None
class Pinger(object):
"""Class for sending ICMP packets asynchronously
The aim is to keep sending ICMP packets on background while executing other
code. After background 'ping' command is stopped, statistics are available.
Difference to assert_(no_)ping() functions located in this module is that
these methods send given count of ICMP packets while they wait for the
exit code of 'ping' command.
>>> pinger = Pinger('pinger_test', '192.168.0.2')
>>> pinger.start(); time.sleep(5); pinger.stop()
>>> pinger.sent, pinger.received
7 7
"""
stats_pattern = re.compile(
r'^(?P<trans>\d+) packets transmitted,.*(?P<recv>\d+) received.*$')
TIMEOUT = 15
def __init__(self, namespace, address, count=None, timeout=1):
self.proc = None
self.namespace = namespace
self.address = address
self.count = count
self.timeout = timeout
self.sent = 0
self.received = 0
def _wait_for_death(self):
is_dead = lambda: self.proc.poll() is not None
utils.wait_until_true(
is_dead, timeout=self.TIMEOUT, exception=RuntimeError(
"Ping command hasn't ended after %d seconds." % self.TIMEOUT))
def _parse_stats(self):
for line in self.proc.stdout:
result = self.stats_pattern.match(line)
if result:
self.sent = int(result.group('trans'))
self.received = int(result.group('recv'))
break
else:
raise RuntimeError("Didn't find ping statistics.")
def start(self):
if self.proc and self.proc.is_running:
raise RuntimeError("This pinger has already a running process")
ip_version = ip_lib.get_ip_version(self.address)
ping_exec = 'ping' if ip_version == 4 else 'ping6'
cmd = [ping_exec, self.address, '-W', str(self.timeout)]
if self.count:
cmd.extend(['-c', str(self.count)])
self.proc = RootHelperProcess(cmd, namespace=self.namespace)
def stop(self):
if self.proc and self.proc.is_running:
self.proc.kill(signal.SIGINT)
self._wait_for_death()
self._parse_stats()
class NetcatTester(object):
TCP = n_const.PROTO_NAME_TCP

View File

@ -422,3 +422,47 @@ class FirewallTestCase(base.BaseSudoTestCase):
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, list())
self.tester.assert_established_connection(**connection)
def test_preventing_firewall_blink(self):
direction = self.tester.INGRESS
sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
'protocol': 'tcp'}]
self.tester.start_sending_icmp(direction)
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, {})
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
self.tester.stop_sending_icmp(direction)
packets_sent = self.tester.get_sent_icmp_packets(direction)
packets_received = self.tester.get_received_icmp_packets(direction)
self.assertGreater(packets_sent, 0)
self.assertEqual(0, packets_received)
def test_remote_security_groups(self):
remote_sg_id = 'remote_sg_id'
peer_port_desc = self._create_port_description(
self.tester.peer_port_id,
[self.tester.peer_ip_address],
self.tester.peer_mac_address,
[remote_sg_id])
self.firewall.prepare_port_filter(peer_port_desc)
peer_sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress',
'protocol': 'icmp'}]
self._apply_security_group_rules(remote_sg_id, peer_sg_rules)
vm_sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
'protocol': 'icmp', 'remote_group_id': remote_sg_id}]
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID,
vm_sg_rules)
vm_sg_members = {'IPv4': [self.tester.peer_ip_address]}
with self.firewall.defer_apply():
self.firewall.update_security_group_members(
remote_sg_id, vm_sg_members)
self.tester.assert_connection(protocol=self.tester.ICMP,
direction=self.tester.INGRESS)
self.tester.assert_no_connection(protocol=self.tester.TCP,
direction=self.tester.INGRESS)
self.tester.assert_no_connection(protocol=self.tester.ICMP,
direction=self.tester.EGRESS)