Clark Boylan 53522910fb Add firewall behavior assertions to testinfra testing
This attempts to exercise our firewall rules externally via the bridge
host in our testinfra testing. If we like this style of rule we can add
a number of tests for various firewall behaviors that we want to ensure.

Change-Id: I4ee63bc6f15af9b68fc1c690c5d92f4bf9c756c3
2021-12-15 16:36:45 -08:00

137 lines
4.7 KiB
Python

# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException
def take_screenshots(host, shots):
"""Take screenshots
host: the testinfra host info of the remote host where selenium is
running
shots: a list where each element is a list consisting of
* (str) URL to screenshot
* (str) Javascript to execute before shot, None to skip
* (str) filename.png, will be placed in /var/log/screenshots for collection
"""
driver = webdriver.Remote(
command_executor='http://%s:4444/wd/hub' % (host.backend.get_hostname()),
desired_capabilities=webdriver.DesiredCapabilities.FIREFOX)
try:
for url, execute, png in shots:
driver.get(url)
WebDriverWait(driver, 30).until(
lambda driver: driver.execute_script(
'return document.readyState') == 'complete')
if execute:
time.sleep(5)
driver.execute_script(execute)
time.sleep(5)
# NOTE(ianw) This is a mash-up of things I found on
# stackoverflow and other bits googling "full size
# screenshot". You expand the viewport and take a
# shot of the <body> element so that you don't also
# get scrollbars in the shot, with some tweaking
# because the window size. Apparently selinum 4
# will have getFullPageScreeshotAs, so we should switch
# to that when available.
original_size = driver.get_window_size()
required_width = driver.execute_script(
'return document.body.parentNode.scrollWidth')
required_height = driver.execute_script(
'return document.body.parentNode.scrollHeight') + 100
driver.set_window_size(required_width, required_height)
driver.find_element_by_tag_name('body'). \
screenshot("/var/log/screenshots/%s" % png)
driver.set_window_size(
original_size['width'], original_size['height'])
except TimeoutException as e:
raise e
finally:
driver.quit()
def get_ips(value, family=None):
ret = set()
try:
addr_info = socket.getaddrinfo(value, None, family)
except socket.gaierror:
return ret
for addr in addr_info:
ret.add(addr[4][0])
return ret
def verify_iptables(host):
rules = host.iptables.rules()
rules = [x.strip() for x in rules]
print('Comparing against rules:\n%s' % rules)
needed_rules = [
'-P INPUT ACCEPT',
'-P FORWARD DROP',
'-P OUTPUT ACCEPT',
'-N openstack-INPUT',
'-A INPUT -j openstack-INPUT',
'-A openstack-INPUT -i lo -j ACCEPT',
'-A openstack-INPUT -p icmp -m icmp --icmp-type any -j ACCEPT',
'-A openstack-INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT',
'-A openstack-INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT',
'-A openstack-OUTPUT -p tcp -m tcp --dport 25 --tcp-flags FIN,SYN,RST,ACK SYN -j REJECT --reject-with tcp-reset',
'-A openstack-INPUT -j REJECT --reject-with icmp-admin-prohibited'
]
for rule in needed_rules:
assert rule in rules
# Ensure all IPv4+6 addresses for cacti are allowed
for ip in get_ips('cacti.openstack.org', socket.AF_INET):
snmp = ('-A openstack-INPUT -s %s/32 -p udp -m udp'
' --dport 161 -j ACCEPT' % ip)
assert snmp in rules
# TODO(ianw) add ip6tables support to testinfra iptables module
ip6rules = host.check_output('ip6tables -S')
for ip in get_ips('cacti.openstack.org', socket.AF_INET6):
snmp = ('-A openstack-INPUT -s %s/128 -p udp -m udp'
' --dport 161 -j ACCEPT' % ip)
assert snmp in ip6rules
return rules
def check_unreachable(addr, port, errno=113):
# errno 113 is no route to host
try:
s = socket.create_connection((addr, port), timeout=10)
except OSError as e:
# No route to host
assert e.errno == errno
else:
s.close()
# We should always error.
assert False