Now that all the bridge nodes are Jammy (3.10), we can uncap this dependency which will bring in the latest selenium. Unfortunately after investigation the easier way to do things I hoped this would allow doesn't work; comments are added and small updates for new API. Update the users file-match so they run too. Change-Id: I6a9d02bfc79b90417b1f5b3d9431f4305864869c
		
			
				
	
	
		
			155 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			155 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright 2018 Red Hat, Inc.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
# not use this file except in compliance with the License. You may obtain
 | 
						|
# a copy of the License at
 | 
						|
#
 | 
						|
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
# License for the specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
import socket
 | 
						|
import time
 | 
						|
 | 
						|
from selenium import webdriver
 | 
						|
from selenium.webdriver.common.by import By
 | 
						|
from selenium.webdriver.support.ui import WebDriverWait
 | 
						|
from selenium.common.exceptions import TimeoutException
 | 
						|
 | 
						|
def take_screenshots(host, shots):
 | 
						|
    """Take screenshots
 | 
						|
 | 
						|
    host: the testinfra host info of the remote host where selenium is
 | 
						|
          running
 | 
						|
    shots: a list where each element is a list consisting of
 | 
						|
 | 
						|
    * (str) URL to screenshot
 | 
						|
    * (str) Javascript to execute before shot, None to skip
 | 
						|
    * (str) filename.png, will be placed in /var/log/screenshots for collection
 | 
						|
    """
 | 
						|
    firefox_options = webdriver.FirefoxOptions()
 | 
						|
 | 
						|
    driver = webdriver.Remote(
 | 
						|
        command_executor='http://%s:4444/wd/hub' % (host.backend.get_hostname()),
 | 
						|
        options=firefox_options)
 | 
						|
 | 
						|
    try:
 | 
						|
        for url, execute, png in shots:
 | 
						|
            driver.get(url)
 | 
						|
            WebDriverWait(driver, 30).until(
 | 
						|
                lambda driver: driver.execute_script(
 | 
						|
                    'return document.readyState') == 'complete')
 | 
						|
 | 
						|
            if execute:
 | 
						|
                time.sleep(5)
 | 
						|
                driver.execute_script(execute)
 | 
						|
 | 
						|
            time.sleep(5)
 | 
						|
 | 
						|
            # NOTE(ianw) This is a mash-up of things I found on
 | 
						|
            # stackoverflow and other bits googling "full size
 | 
						|
            # screenshot".  You expand the viewport and take a
 | 
						|
            # shot of the <body> element so that you don't also
 | 
						|
            # get scrollbars in the shot, with some tweaking
 | 
						|
            # because of the window size.
 | 
						|
            #
 | 
						|
            # Update 2022-20-09 : The Firefox driver with Selenium 4
 | 
						|
            # has a very simple get_full_page_screenshot_as_png()
 | 
						|
            # which would be perfect -- but -- this only works when
 | 
						|
            # using the local Firefox connection, not the remote
 | 
						|
            # connection we are using here to talk to the docker
 | 
						|
            # container.  I looked at switching this, but to talk to
 | 
						|
            # the local firefox you need geckodriver -- and that
 | 
						|
            # doesn't work with Ubuntu Jammy because Firefox is now
 | 
						|
            # distributed as a snap, not a regular package, and it
 | 
						|
            # doesn't work together [1] (apparently you can get around
 | 
						|
            # it, but it's just other hacks [2]).  So we still have
 | 
						|
            # this ...
 | 
						|
            #
 | 
						|
            # [1] https://bugs.launchpad.net/ubuntu/+source/firefox/+bug/1968266
 | 
						|
            # [2] https://github.com/mozilla/geckodriver/releases/tag/v0.31.0
 | 
						|
 | 
						|
            original_size = driver.get_window_size()
 | 
						|
            required_width = driver.execute_script(
 | 
						|
                'return document.body.parentNode.scrollWidth')
 | 
						|
            required_height = driver.execute_script(
 | 
						|
                'return document.body.parentNode.scrollHeight') + 100
 | 
						|
            driver.set_window_size(required_width, required_height)
 | 
						|
 | 
						|
            driver.find_element(By.TAG_NAME, 'body').\
 | 
						|
                screenshot("/var/log/screenshots/%s" % png)
 | 
						|
 | 
						|
            driver.set_window_size(
 | 
						|
                original_size['width'], original_size['height'])
 | 
						|
 | 
						|
    except TimeoutException as e:
 | 
						|
        raise e
 | 
						|
    finally:
 | 
						|
        driver.quit()
 | 
						|
 | 
						|
 | 
						|
 | 
						|
def get_ips(value, family=None):
 | 
						|
    ret = set()
 | 
						|
    try:
 | 
						|
        addr_info = socket.getaddrinfo(value, None, family)
 | 
						|
    except socket.gaierror:
 | 
						|
        return ret
 | 
						|
    for addr in addr_info:
 | 
						|
        ret.add(addr[4][0])
 | 
						|
    return ret
 | 
						|
 | 
						|
 | 
						|
def verify_iptables(host):
 | 
						|
    rules = host.iptables.rules()
 | 
						|
    rules = [x.strip() for x in rules]
 | 
						|
    print('Comparing against rules:\n%s' % rules)
 | 
						|
 | 
						|
    needed_rules = [
 | 
						|
        '-P INPUT ACCEPT',
 | 
						|
        '-P FORWARD DROP',
 | 
						|
        '-P OUTPUT ACCEPT',
 | 
						|
        '-N openstack-INPUT',
 | 
						|
        '-A INPUT -j openstack-INPUT',
 | 
						|
        '-A openstack-INPUT -i lo -j ACCEPT',
 | 
						|
        '-A openstack-INPUT -p icmp -m icmp --icmp-type any -j ACCEPT',
 | 
						|
        '-A openstack-INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT',
 | 
						|
        '-A openstack-INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT',
 | 
						|
        '-A openstack-OUTPUT -p tcp -m tcp --dport 25 --tcp-flags FIN,SYN,RST,ACK SYN -j REJECT --reject-with tcp-reset',
 | 
						|
        '-A openstack-INPUT -j REJECT --reject-with icmp-admin-prohibited'
 | 
						|
    ]
 | 
						|
    for rule in needed_rules:
 | 
						|
        assert rule in rules
 | 
						|
 | 
						|
    # Ensure all IPv4+6 addresses for cacti are allowed
 | 
						|
    for ip in get_ips('cacti.openstack.org', socket.AF_INET):
 | 
						|
        snmp = ('-A openstack-INPUT -s %s/32 -p udp -m udp'
 | 
						|
                ' --dport 161 -j ACCEPT' % ip)
 | 
						|
        assert snmp in rules
 | 
						|
 | 
						|
    # TODO(ianw) add ip6tables support to testinfra iptables module
 | 
						|
    ip6rules = host.check_output('ip6tables -S')
 | 
						|
    for ip in get_ips('cacti.openstack.org', socket.AF_INET6):
 | 
						|
        snmp = ('-A openstack-INPUT -s %s/128 -p udp -m udp'
 | 
						|
                ' --dport 161 -j ACCEPT' % ip)
 | 
						|
        assert snmp in ip6rules
 | 
						|
 | 
						|
    return rules
 | 
						|
 | 
						|
 | 
						|
def check_unreachable(addr, port, errno=113):
 | 
						|
    # errno 113 is no route to host
 | 
						|
    try:
 | 
						|
        s = socket.create_connection((addr, port), timeout=10)
 | 
						|
    except OSError as e:
 | 
						|
        # No route to host
 | 
						|
        assert e.errno == errno
 | 
						|
    else:
 | 
						|
        s.close()
 | 
						|
        # We should always error.
 | 
						|
        assert False
 |