53522910fb
This attempts to exercise our firewall rules externally via the bridge host in our testinfra testing. If we like this style of rule we can add a number of tests for various firewall behaviors that we want to ensure. Change-Id: I4ee63bc6f15af9b68fc1c690c5d92f4bf9c756c3
137 lines
4.7 KiB
Python
137 lines
4.7 KiB
Python
# Copyright 2018 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import socket
|
|
import time
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.common.exceptions import TimeoutException
|
|
|
|
def take_screenshots(host, shots):
|
|
"""Take screenshots
|
|
|
|
host: the testinfra host info of the remote host where selenium is
|
|
running
|
|
shots: a list where each element is a list consisting of
|
|
|
|
* (str) URL to screenshot
|
|
* (str) Javascript to execute before shot, None to skip
|
|
* (str) filename.png, will be placed in /var/log/screenshots for collection
|
|
"""
|
|
driver = webdriver.Remote(
|
|
command_executor='http://%s:4444/wd/hub' % (host.backend.get_hostname()),
|
|
desired_capabilities=webdriver.DesiredCapabilities.FIREFOX)
|
|
|
|
try:
|
|
for url, execute, png in shots:
|
|
driver.get(url)
|
|
WebDriverWait(driver, 30).until(
|
|
lambda driver: driver.execute_script(
|
|
'return document.readyState') == 'complete')
|
|
|
|
if execute:
|
|
time.sleep(5)
|
|
driver.execute_script(execute)
|
|
|
|
time.sleep(5)
|
|
|
|
# NOTE(ianw) This is a mash-up of things I found on
|
|
# stackoverflow and other bits googling "full size
|
|
# screenshot". You expand the viewport and take a
|
|
# shot of the <body> element so that you don't also
|
|
# get scrollbars in the shot, with some tweaking
|
|
# because the window size. Apparently selinum 4
|
|
# will have getFullPageScreeshotAs, so we should switch
|
|
# to that when available.
|
|
original_size = driver.get_window_size()
|
|
required_width = driver.execute_script(
|
|
'return document.body.parentNode.scrollWidth')
|
|
required_height = driver.execute_script(
|
|
'return document.body.parentNode.scrollHeight') + 100
|
|
driver.set_window_size(required_width, required_height)
|
|
|
|
driver.find_element_by_tag_name('body'). \
|
|
screenshot("/var/log/screenshots/%s" % png)
|
|
|
|
driver.set_window_size(
|
|
original_size['width'], original_size['height'])
|
|
|
|
except TimeoutException as e:
|
|
raise e
|
|
finally:
|
|
driver.quit()
|
|
|
|
|
|
|
|
def get_ips(value, family=None):
|
|
ret = set()
|
|
try:
|
|
addr_info = socket.getaddrinfo(value, None, family)
|
|
except socket.gaierror:
|
|
return ret
|
|
for addr in addr_info:
|
|
ret.add(addr[4][0])
|
|
return ret
|
|
|
|
|
|
def verify_iptables(host):
|
|
rules = host.iptables.rules()
|
|
rules = [x.strip() for x in rules]
|
|
print('Comparing against rules:\n%s' % rules)
|
|
|
|
needed_rules = [
|
|
'-P INPUT ACCEPT',
|
|
'-P FORWARD DROP',
|
|
'-P OUTPUT ACCEPT',
|
|
'-N openstack-INPUT',
|
|
'-A INPUT -j openstack-INPUT',
|
|
'-A openstack-INPUT -i lo -j ACCEPT',
|
|
'-A openstack-INPUT -p icmp -m icmp --icmp-type any -j ACCEPT',
|
|
'-A openstack-INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT',
|
|
'-A openstack-INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT',
|
|
'-A openstack-OUTPUT -p tcp -m tcp --dport 25 --tcp-flags FIN,SYN,RST,ACK SYN -j REJECT --reject-with tcp-reset',
|
|
'-A openstack-INPUT -j REJECT --reject-with icmp-admin-prohibited'
|
|
]
|
|
for rule in needed_rules:
|
|
assert rule in rules
|
|
|
|
# Ensure all IPv4+6 addresses for cacti are allowed
|
|
for ip in get_ips('cacti.openstack.org', socket.AF_INET):
|
|
snmp = ('-A openstack-INPUT -s %s/32 -p udp -m udp'
|
|
' --dport 161 -j ACCEPT' % ip)
|
|
assert snmp in rules
|
|
|
|
# TODO(ianw) add ip6tables support to testinfra iptables module
|
|
ip6rules = host.check_output('ip6tables -S')
|
|
for ip in get_ips('cacti.openstack.org', socket.AF_INET6):
|
|
snmp = ('-A openstack-INPUT -s %s/128 -p udp -m udp'
|
|
' --dport 161 -j ACCEPT' % ip)
|
|
assert snmp in ip6rules
|
|
|
|
return rules
|
|
|
|
|
|
def check_unreachable(addr, port, errno=113):
|
|
# errno 113 is no route to host
|
|
try:
|
|
s = socket.create_connection((addr, port), timeout=10)
|
|
except OSError as e:
|
|
# No route to host
|
|
assert e.errno == errno
|
|
else:
|
|
s.close()
|
|
# We should always error.
|
|
assert False
|