murano-dashboard/muranodashboard/tests/functional/base.py

613 lines
26 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from io import BytesIO
import json
import logging
import os
import testtools
import time
import urllib.parse as urlparse
import uuid
from glanceclient import client as gclient
from keystoneauth1.identity import v3
from keystoneauth1 import session as ks_session
from keystoneclient.v3 import client as ks_client
from muranoclient.common import exceptions as muranoclient_exc
from muranoclient.glance import client as glare_client
import muranoclient.v1.client as mclient
from oslo_log import handlers
from oslo_log import log
from selenium.common import exceptions as exc
from selenium import webdriver
import selenium.webdriver.common.by as by
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support import ui
from muranodashboard.tests.functional.config import config as cfg
from muranodashboard.tests.functional import consts
from muranodashboard.tests.functional import utils
logger = log.getLogger(__name__).logger
logger.level = logging.DEBUG
logger.addHandler(handlers.ColorHandler())
class UITestCase(testtools.TestCase):
@classmethod
def setUpClass(cls):
auth = v3.Password(user_domain_name='Default',
username=cfg.common.user,
password=cfg.common.password,
project_domain_name='Default',
project_name=cfg.common.tenant,
auth_url=cfg.common.keystone_url)
session = ks_session.Session(auth=auth)
cls.keystone_client = ks_client.Client(session=session)
cls.auth_ref = auth.get_auth_ref(session)
cls.service_catalog = cls.auth_ref.service_catalog
if utils.glare_enabled():
glare_endpoint = "http://127.0.0.1:9494"
artifacts_client = glare_client.Client(
endpoint=glare_endpoint, token=cls.auth_ref.auth_token,
insecure=False, key_file=None, ca_file=None, cert_file=None,
type_name="murano", type_version=1)
else:
artifacts_client = None
cls.murano_client = mclient.Client(
artifacts_client=artifacts_client,
endpoint_override=cfg.common.murano_url,
session=session)
cls.url_prefix = urlparse.urlparse(cfg.common.horizon_url).path or ''
if cls.url_prefix.endswith('/'):
cls.url_prefix = cls.url_prefix[:-1]
def setUp(self):
super(UITestCase, self).setUp()
# Enables zip files to be automatically saved to disk, without opening
# a browser dialog.
fp = webdriver.FirefoxProfile()
fp.set_preference("browser.download.folderList", 2)
fp.set_preference("browser.download.manager.showWhenStarting", False)
fp.set_preference("browser.download.dir", os.getcwd())
fp.set_preference("browser.helperApps.neverAsk.saveToDisk",
"application/octet-stream")
self.driver = webdriver.Firefox(firefox_profile=fp)
self.addCleanup(self.driver.quit)
self.driver.maximize_window()
self.driver.get(cfg.common.horizon_url + '/app-catalog/environments')
self.driver.implicitly_wait(30)
self.addOnException(self.take_screenshot)
self.log_in()
self.switch_to_project(cfg.common.tenant)
def tearDown(self):
super(UITestCase, self).tearDown()
for env in self.murano_client.environments.list():
self.remove_environment(env.id)
def gen_random_resource_name(self, prefix=None, reduce_by=None):
random_name = str(uuid.uuid4()).replace('-', '')[::reduce_by]
if prefix:
random_name = prefix + '_' + random_name
return random_name
def remove_environment(self, environment_id, timeout=180):
self.murano_client.environments.delete(environment_id)
start_time = time.time()
while time.time() - start_time < timeout:
try:
self.murano_client.environments.get(environment_id)
time.sleep(1)
except Exception:
# TODO(smurashov): bug/1378764 replace Exception to NotFound
return
raise Exception(
'Environment {0} was not deleted in {1} seconds'.format(
environment_id, timeout))
@classmethod
def create_user(cls, name, password=None, email=None, tenant_id=None):
if tenant_id is None:
projects = cls.keystone_client.projects.list()
tenant_id = [project.id for project in projects
if project.name == cfg.common.tenant][0]
cls.keystone_client.users.create(name, domain='default',
password=password,
email=email,
project=tenant_id,
enabled=True)
else:
cls.keystone_client.users.create(name, domain='default',
password=password,
email=email,
project=tenant_id,
enabled=True)
roles = cls.keystone_client.roles.list()
role_id = [role.id for role in roles if role.name == 'member'][0]
users = cls.keystone_client.users.list()
user_id = [user.id for user in users if user.name == name][0]
cls.keystone_client.roles.grant(role_id, user=user_id,
project=tenant_id)
@classmethod
def delete_user(cls, name):
cls.keystone_client.users.find(name=name).delete()
def get_tenantid_by_name(self, name):
"""Returns TenantID of the project by project's name"""
tenant_id = [tenant.id for tenant
in self.keystone_client.projects.list()
if tenant.name == name]
return tenant_id[0]
def add_user_to_project(self, project_id, user_id, user_role=None):
if not user_role:
roles = self.keystone_client.roles.list()
role_id = [role.id for role in roles if role.name == 'member'][0]
if not user_id:
user_name = cfg.common.user
users = self.keystone_client.users.list()
user_id = [user.id for user in users if user.name == user_name][0]
self.keystone_client.roles.grant(role_id, user=user_id,
project=project_id)
def switch_to_project(self, name):
projects_xpath = ("//ul[contains(@class, navbar-nav)]"
"//li[contains(@class, dropdown)]")
name_xpath = ("//a//span[contains(@class, dropdown-title)"
"and normalize-space(text())='{0}']".format(name))
btn_xpath = "//a[contains(@class, dropdown-toggle) and @href='#']"
projects_list = self.driver.find_element_by_xpath(projects_xpath)
if projects_list.text != name:
if 'open' not in projects_list.get_attribute('class'):
projects_list.find_element_by_xpath(btn_xpath).click()
projects_list.find_element_by_xpath(name_xpath).click()
self.wait_for_alert_message()
# else the project is already set
def take_screenshot(self, exception):
"""Taking screenshot on error
This decorators will take a screenshot of the browser
when the test failed or when exception raised on the test.
Screenshot will be saved as PNG inside screenshots folder.
"""
name = self._testMethodName
logger.error('{0} failed'.format(name))
screenshot_dir = './screenshots'
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
filename = os.path.join(screenshot_dir, name + '.png')
self.driver.get_screenshot_as_file(filename)
def log_in(self, username=None, password=None):
username = username or cfg.common.user
password = password or cfg.common.password
self.fill_field(by.By.ID, 'id_username', username)
self.fill_field(by.By.ID, 'id_password', password)
self.driver.find_element_by_xpath("//button[@type='submit']").click()
murano = self.driver.find_element_by_xpath(consts.AppCatalog)
if 'collapsed' in murano.get_attribute('class'):
murano.click()
def log_out(self):
user_menu = self.driver.find_element(
by.By.XPATH, "//ul[contains(@class, 'navbar-right')]")
user_menu.find_element(
by.By.XPATH, ".//span[@class='user-name']").click()
user_menu.find_element(by.By.PARTIAL_LINK_TEXT, 'Sign Out').click()
def fill_field(self, by_find, field, value):
self.check_element_on_page(by_find, field)
self.wait_element_is_clickable(by_find, field)
self.driver.find_element(by=by_find, value=field).clear()
self.driver.find_element(by=by_find, value=field).send_keys(value)
def get_element_id(self, el_name, sec=10):
el = ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located(
(by.By.XPATH, consts.AppPackages.format(el_name))))
path = el.get_attribute("id")
return path.split('__')[-1]
def select_and_click_action_for_app(self, action, app):
self.check_element_on_page(
by.By.XPATH,
"//*[@href='{0}/app-catalog/catalog/{1}/{2}']".format(
self.url_prefix, action, app))
self.wait_element_is_clickable(
by.By.XPATH,
"//*[@href='{0}/app-catalog/catalog/{1}/{2}']".format(
self.url_prefix, action, app)).click()
def go_to_submenu(self, link):
element = self.wait_element_is_clickable(by.By.PARTIAL_LINK_TEXT, link)
element.click()
self.wait_for_sidebar_is_loaded()
def check_panel_is_present(self, panel_name):
self.assertIn(panel_name,
self.driver.find_element_by_xpath(
".//*[@class='page-header']").text)
def navigate_to(self, menu):
el = self.wait_element_is_clickable(
by.By.XPATH, getattr(consts, menu))
if 'collapsed' in el.get_attribute('class'):
el.click()
def select_from_list(self, list_name, value, sec=10):
locator = (by.By.XPATH,
"//select[contains(@name, '{0}')]"
"/option[@value='{1}']".format(list_name, value))
el = ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located(locator))
el.click()
def check_element_on_page(self, method, value, sec=10):
try:
ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located((method, value)))
except exc.TimeoutException:
self.fail("Element {0} is not present on the page".format(value))
def check_element_not_on_page(self, method, value, sec=3):
self.driver.implicitly_wait(sec)
present = True
try:
self.driver.find_element(method, value)
except (exc.NoSuchElementException, exc.ElementNotVisibleException):
present = False
self.assertFalse(present, u"Element {0} is present on the page"
" while it should't".format(value))
self.driver.implicitly_wait(30)
def check_alert_message(self, message, sec=10):
locator = (by.By.CSS_SELECTOR, 'div.alert-dismissable')
try:
ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located(locator))
except exc.TimeoutException:
self.fail("Alert is not present on the page")
self.assertIn(message, self.driver.find_element(*locator).text)
def create_environment(self, env_name, by_id=False):
if by_id:
self.driver.find_element_by_id(
'environments__action_CreateEnvironment').click()
else:
self.driver.find_element_by_css_selector(
consts.CreateEnvironment).click()
self.fill_field(by.By.ID, 'id_name', env_name)
self.driver.find_element_by_id(consts.ConfirmCreateEnvironment).click()
self.wait_for_alert_message()
def delete_environment(self, env_name, from_detail_view=False):
if not from_detail_view:
self.select_action_for_environment(env_name, 'delete')
self.driver.find_element_by_xpath(consts.ConfirmDeletion).click()
self.wait_for_alert_message()
def select_action_for_environment(self, env_name, action):
element_id = self.get_element_id(env_name)
more_button = consts.More.format('environments', element_id)
self.driver.find_element_by_xpath(more_button).click()
btn_id = "environments__row_{0}__action_{1}".format(element_id, action)
self.driver.find_element_by_id(btn_id).click()
def wait_for_alert_message(self, sec=5):
locator = (by.By.CSS_SELECTOR, 'div.alert-success')
logger.debug("Waiting for a success message")
ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located(locator))
def wait_for_alert_message_to_disappear(self, sec=10):
# The alert message pops up directly over the delete environment
# button, causing click issues. So we must wait for the alert message
# to completely disappear before clicking the button.
locator = (by.By.CSS_SELECTOR, 'div.alert-success')
logger.debug("Waiting for a success message to disappear")
ui.WebDriverWait(self.driver, sec).until(
EC.invisibility_of_element_located(locator))
def wait_for_error_message(self, sec=20):
locator = (by.By.CSS_SELECTOR, 'div.alert-danger > p')
logger.debug("Waiting for an error message")
ui.WebDriverWait(self.driver, sec, 1).until(
EC.presence_of_element_located(locator))
return self.driver.find_element(*locator).text
def wait_element_is_clickable(self, method, element, sec=10):
return ui.WebDriverWait(self.driver, sec).until(
EC.element_to_be_clickable((method, element)))
def wait_for_sidebar_is_loaded(self, sec=10):
ui.WebDriverWait(self.driver, sec).until(
EC.presence_of_element_located(
(by.By.CSS_SELECTOR, "nav#sidebar a.active")))
time.sleep(0.5)
@contextlib.contextmanager
def wait_for_page_reload(self, sec=10):
old_page = self.driver.find_element_by_tag_name('html')
yield
ui.WebDriverWait(self, sec).until(EC.staleness_of(old_page))
class PackageBase(UITestCase):
@classmethod
def setUpClass(cls):
super(PackageBase, cls).setUpClass()
cls.packages = []
cls.mockapp_id = cls.upload_package(
"MockApp",
{"categories": ["Web"], "tags": ["tag"]})
cls.postgre_id = cls.upload_package(
"PostgreSQL",
{"categories": ["Databases"], "tags": ["tag"]})
cls.hot_app_id = cls.upload_package(
"HotExample",
{"tags": ["hot"]}, hot=True)
cls.deployingapp_id = cls.upload_package(
"DeployingApp",
{"categories": ["Web"], "tags": ["tag"]},
hot=False,
package_dir=consts.DeployingPackageDir)
@classmethod
def upload_package(cls, name, data, **kwargs):
package = utils.upload_app_package(cls.murano_client, name, data,
**kwargs)
cls.packages.append(package)
return package
@classmethod
def tearDownClass(cls):
super(PackageBase, cls).tearDownClass()
# In case dynamically created packages are deleted at test level,
# ignore not found errors below.
for package in cls.packages:
try:
cls.murano_client.packages.delete(package)
except muranoclient_exc.HTTPNotFound:
pass
class ImageTestCase(PackageBase):
@classmethod
def setUpClass(cls):
super(ImageTestCase, cls).setUpClass()
glance_endpoint = cls.service_catalog.url_for(service_type='image')
cls.glance = gclient.Client('2', endpoint=glance_endpoint,
session=cls.keystone_client.session)
def setUp(self):
super(ImageTestCase, self).setUp()
self.image_title = self.gen_random_resource_name('default-image', 15)
self.image = self.upload_image(self.image_title)
def tearDown(self):
super(ImageTestCase, self).tearDown()
self.glance.images.delete(self.image.id)
@classmethod
def upload_image(cls, title):
try:
murano_property = json.dumps({'title': title, 'type': 'linux'})
image = cls.glance.images.create(name='TestImage',
disk_format='qcow2',
container_format='bare',
is_public='True',
murano_image_info=murano_property)
image_data = BytesIO(None)
cls.glance.images.upload(image['id'], image_data)
except Exception:
logger.error("Unable to create or update image in Glance")
raise
return image
def select_and_click_element(self, element):
self.driver.find_element_by_xpath(
".//*[@value = '{0}']".format(element)).click()
class FieldsTestCase(PackageBase):
def check_js_error_message_is_present(self, error_message):
self.driver.implicitly_wait(2)
self.driver.find_element_by_xpath(
consts.JsErrorMessage.format(error_message))
self.driver.implicitly_wait(30)
def check_js_error_message_is_absent(self, error_message):
self.driver.implicitly_wait(2)
try:
self.driver.find_element_by_xpath(
consts.ErrorMessage.format(error_message))
except (exc.NoSuchElementException, exc.ElementNotVisibleException):
logger.info("Message {0} is not"
" present on the page".format(error_message))
self.driver.implicitly_wait(30)
def check_error_message_is_present(self, error_message):
self.driver.find_element_by_xpath(consts.ButtonSubmit).click()
self.driver.find_element_by_xpath(
consts.ErrorMessage.format(error_message))
def check_error_message_is_absent(self, error_message):
self.driver.find_element_by_xpath(consts.ButtonSubmit).click()
self.driver.implicitly_wait(2)
try:
self.driver.find_element_by_xpath(
consts.ErrorMessage.format(error_message))
except (exc.NoSuchElementException, exc.ElementNotVisibleException):
logger.info("Message {0} is not"
" present on the page".format(error_message))
self.driver.implicitly_wait(30)
class ApplicationTestCase(ImageTestCase):
def delete_component(self, component_name=None):
if component_name:
component_id = self.get_element_id(component_name)
btn = self.wait_element_is_clickable(
by.By.ID, 'services__row_{0}__action_delete'.format(
component_id))
else:
btn = self.wait_element_is_clickable(by.By.CSS_SELECTOR,
consts.DeleteComponent)
btn.click()
el = self.wait_element_is_clickable(by.By.LINK_TEXT,
'Delete Component', sec=30)
el.click()
self.wait_for_alert_message()
def select_action_for_package(self, package_id, action, sec=10):
if action == 'more':
el = self.wait_element_is_clickable(
by.By.XPATH, "//tr[@data-object-id='{0}']"
"//a[@data-toggle='dropdown']".format(package_id))
el.click()
ui.WebDriverWait(self.driver, sec).until(lambda s: s.find_element(
by.By.XPATH,
".//*[@id='packages__row_{0}__action_download_package']".
format(package_id)).is_displayed())
else:
self.driver.find_element_by_xpath(
".//*[@id='packages__row_{0}__action_{1}']".
format(package_id, action)).click()
def check_package_parameter(self, selector, column, value):
columns = {'Tenant Name': 3, 'Active': 4, 'Public': 5}
column_num = str(columns[column])
column_element = self.driver.find_element_by_xpath(
"//tr[{0}]/td[{1}]".format(selector, column_num))
self.assertTrue(column_element.text == value,
"'{0}' column doesn't contain '{1}'".format(column,
value))
def check_package_parameter_by_id(self, package_id, column, value):
selector = '@data-object-id="{0}"'.format(package_id)
self.check_package_parameter(selector, column, value)
def check_package_parameter_by_name(self, package_name, column, value):
selector = '@data-display="{0}"'.format(package_name)
self.check_package_parameter(selector, column, value)
def modify_package(self, param, value):
self.fill_field(by.By.ID, 'id_{0}'.format(param), value)
self.driver.find_element_by_xpath(consts.InputSubmit).click()
self.wait_for_alert_message()
def add_app_to_env(self, app_id, app_name='TestApp', env_id=None):
self.navigate_to('Browse')
self.go_to_submenu('Browse Local')
if env_id:
action = 'add'
app = '{0}/{1}'.format(app_id, env_id)
else:
action = 'quick-add'
app = app_id
self.select_and_click_action_for_app(action, app)
field_id = "{0}_0-name".format(app_id)
self.fill_field(by.By.ID, field_id, value=app_name)
self.wait_element_is_clickable(by.By.XPATH,
consts.ButtonSubmit).click()
self.wait_element_is_clickable(by.By.XPATH,
consts.InputSubmit).click()
self.select_from_list('osImage', self.image.id)
if env_id:
# If another app is added, then env_id is passed in. In this case,
# the 'Next' followed by 'Create' must be clicked.
self.check_element_on_page(by.By.CSS_SELECTOR,
consts.NextWizardSubmit)
self.wait_element_is_clickable(
by.By.CSS_SELECTOR, consts.NextWizardSubmit).click()
self.check_element_on_page(by.By.CSS_SELECTOR,
consts.CreateWizardSubmit)
self.wait_element_is_clickable(
by.By.CSS_SELECTOR, consts.CreateWizardSubmit).click()
self.wait_element_is_clickable(by.By.ID, consts.AddComponent)
self.check_element_on_page(by.By.LINK_TEXT, app_name)
else:
# Otherwise, only 'Create' needs to be clicked.
self.check_element_on_page(by.By.CSS_SELECTOR,
consts.CreateWizardSubmit)
self.wait_element_is_clickable(
by.By.CSS_SELECTOR, consts.CreateWizardSubmit).click()
self.wait_for_alert_message()
def execute_action_from_table_view(self, env_name, table_action):
"""Executes an action like Deploy or Delete from the table view.
Does not handle clicking on the confirmation modal that may appear.
Scenario:
1. Checks for the table drop-down button and then clicks it.
2. Checks for the table drop-down menu to appear.
3. Checks for the ``table_action`` button and then clicks it.
"""
self.check_element_on_page(
by.By.XPATH, consts.TableDropdownBtn.format(env_name))
dropdown_btn = self.driver.find_element(
by.By.XPATH, consts.TableDropdownBtn.format(env_name))
dropdown_btn.click()
self.check_element_on_page(by.By.XPATH,
consts.TableDropdownMenu.format(env_name))
self.check_element_on_page(
by.By.XPATH, consts.TableDropdownAction.format(env_name,
table_action))
action_btn = self.driver.find_element(
by.By.XPATH, consts.TableDropdownAction.format(env_name,
table_action))
action_btn.click()
class PackageTestCase(ApplicationTestCase):
@classmethod
def setUpClass(cls):
super(ApplicationTestCase, cls).setUpClass()
cls.archive_name = "ToUpload"
cls.alt_archive_name = "ModifiedAfterUpload"
cls.manifest = os.path.join(consts.PackageDir, 'manifest.yaml')
cls.archive = utils.compose_package(cls.archive_name, cls.manifest,
consts.PackageDir)
def tearDown(self):
super(PackageTestCase, self).tearDown()
for package in self.murano_client.packages.list(include_disabled=True):
if package.name in [self.archive_name, self.alt_archive_name]:
self.murano_client.packages.delete(package.id)
@classmethod
def tearDownClass(cls):
super(ApplicationTestCase, cls).tearDownClass()
if os.path.exists(cls.manifest):
os.remove(cls.manifest)
if os.path.exists(cls.archive):
os.remove(cls.archive)