# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import json import logging import os import sys import testtools import time import urlparse from glanceclient import client as gclient from keystoneclient.v2_0 import client as ksclient from muranoclient import client as mclient from selenium.common import exceptions as exc from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains import selenium.webdriver.common.by as by from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import ui import config.config as cfg from muranodashboard.tests.functional import consts from muranodashboard.tests.functional import utils log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) log.addHandler(logging.StreamHandler()) if sys.version_info >= (2, 7): class BaseDeps(testtools.TestCase): pass else: # Define asserts for python26 import unittest2 class BaseDeps(testtools.TestCase, unittest2.TestCase): pass class UITestCase(BaseDeps): @classmethod def setUpClass(cls): cls.keystone_client = ksclient.Client(username=cfg.common.user, password=cfg.common.password, tenant_name=cfg.common.tenant, auth_url=cfg.common.keystone_url) cls.murano_client = mclient.Client( '1', endpoint=cfg.common.murano_url, token=cls.keystone_client.auth_token) cls.url_prefix = urlparse.urlparse(cfg.common.horizon_url).path or '' if cls.url_prefix.endswith('/'): cls.url_prefix = cls.url_prefix[:-1] def setUp(self): super(UITestCase, self).setUp() self.driver = webdriver.Firefox() self.driver.maximize_window() self.driver.get(cfg.common.horizon_url + '/murano/environments') self.driver.implicitly_wait(30) self.addOnException(self.take_screenshot) self.log_in() def tearDown(self): super(UITestCase, self).tearDown() self.driver.quit() for env in self.murano_client.environments.list(): self.remove_environment(env.id) def remove_environment(self, environment_id, timeout=180): self.murano_client.environments.delete(environment_id) start_time = time.time() while time.time() - start_time < timeout: try: self.murano_client.environments.get(environment_id) time.sleep(1) except Exception: # TODO(smurashov): bug/1378764 replace Exception to NotFound return raise Exception( 'Environment {0} was not deleted in {1} seconds'.format( environment_id, timeout)) def take_screenshot(self, exception): """Taking screenshot on error This decorators will take a screenshot of the browser when the test failed or when exception raised on the test. Screenshot will be saved as PNG inside screenshots folder. """ name = self._testMethodName log.exception('{0} failed'.format(name)) screenshot_dir = './screenshots' if not os.path.exists(screenshot_dir): os.makedirs(screenshot_dir) filename = os.path.join(screenshot_dir, name + '.png') self.driver.get_screenshot_as_file(filename) def log_in(self): self.fill_field(by.By.ID, 'id_username', cfg.common.user) self.fill_field(by.By.ID, 'id_password', cfg.common.password) self.driver.find_element_by_xpath("//button[@type='submit']").click() murano = self.driver.find_element_by_xpath(consts.Murano) if 'active' not in murano.get_attribute('class'): murano.click() def fill_field(self, by_find, field, value): self.driver.find_element(by=by_find, value=field).clear() self.driver.find_element(by=by_find, value=field).send_keys(value) def get_element_id(self, el_name): el = ui.WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( (by.By.XPATH, consts.AppPackageDefinitions.format(el_name)))) path = el.get_attribute("id") return path.split('__')[-1] def select_and_click_action_for_app(self, action, app): self.driver.find_element_by_xpath( "//*[@href='{0}/murano/catalog/{1}/{2}']".format(self.url_prefix, action, app)).click() def go_to_submenu(self, link): element = self.wait_element_is_clickable(by.By.PARTIAL_LINK_TEXT, link) element.click() self.wait_for_sidebar_is_loaded() def check_panel_is_present(self, panel_name): self.assertIn(panel_name, self.driver.find_element_by_xpath( ".//*[@class='page-header']").text) def navigate_to(self, menu): el = ui.WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( (by.By.XPATH, getattr(consts, menu)))) if 'active' not in el.get_attribute('class'): el.click() self.wait_for_sidebar_is_loaded() def select_from_list(self, list_name, value): locator = (by.By.XPATH, "//select[contains(@name, '{0}')]" "/option[@value='{1}']".format(list_name, value)) el = ui.WebDriverWait(self.driver, 10).until( EC.presence_of_element_located(locator)) el.click() def check_element_on_page(self, method, value, sec=10): try: ui.WebDriverWait(self.driver, sec).until( EC.presence_of_element_located((method, value))) except exc.TimeoutException: self.fail("Element {0} is not preset on the page".format(value)) def check_element_not_on_page(self, method, value): self.driver.implicitly_wait(3) present = True try: self.driver.find_element(method, value) except (exc.NoSuchElementException, exc.ElementNotVisibleException): present = False self.assertFalse(present, "Element {0} is preset on the page" " while it should't".format(value)) self.driver.implicitly_wait(30) def create_environment(self, env_name, by_id=False): if by_id: self.driver.find_element_by_id( 'murano__action_CreateEnvironment').click() else: self.driver.find_element_by_css_selector( consts.CreateEnvironment).click() self.fill_field(by.By.ID, 'id_name', env_name) self.driver.find_element_by_id(consts.ConfirmCreateEnvironment).click() self.wait_for_alert_message() def delete_environment(self, env_name): self.select_action_for_environment(env_name, 'delete') self.driver.find_element_by_xpath(consts.ConfirmDeletion).click() self.wait_for_alert_message() def edit_environment(self, old_name, new_name): el_td = self.driver.find_element_by_css_selector( 'tr[data-display="{0}"] td:first-of-type'.format(old_name)) el_pencil = el_td.find_element_by_css_selector( 'button.ajax-inline-edit') # hover to make pencil visible hover = ActionChains(self.driver).move_to_element(el_td) hover.perform() el_pencil.click() # fill in inline input el_inline_input = self.driver.find_element_by_css_selector( 'tr[data-display="{0}"] '.format(old_name) + 'td:first-of-type .inline-edit-form input') el_inline_input.clear() el_inline_input.send_keys(new_name) # click submit el_submit = self.driver.find_element_by_css_selector( 'tr[data-display="{0}"] '.format(old_name) + 'td:first-of-type .inline-edit-actions button[type="submit"]') el_submit.click() # there is no alert message def select_action_for_environment(self, env_name, action): element_id = self.get_element_id(env_name) more_button = consts.More.format('murano', element_id) self.driver.find_element_by_xpath(more_button).click() btn_id = "murano__row_{0}__action_{1}".format(element_id, action) self.driver.find_element_by_id(btn_id).click() def wait_for_alert_message(self): locator = (by.By.CSS_SELECTOR, 'div.alert-success') log.debug("Waiting for a success message") ui.WebDriverWait(self.driver, 2).until( EC.presence_of_element_located(locator)) def wait_element_is_clickable(self, method, element): return ui.WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((method, element))) def wait_for_sidebar_is_loaded(self): ui.WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( (by.By.CSS_SELECTOR, "div.sidebar dt.active"))) time.sleep(0.5) class PackageBase(UITestCase): @classmethod def setUpClass(cls): super(PackageBase, cls).setUpClass() cls.mockapp_id = utils.upload_app_package( cls.murano_client, "MockApp", {"categories": ["Web"], "tags": ["tag"]}) cls.postgre_id = utils.upload_app_package( cls.murano_client, "PostgreSQL", {"categories": ["Databases"], "tags": ["tag"]}) @classmethod def tearDownClass(cls): super(PackageBase, cls).tearDownClass() cls.murano_client.packages.delete(cls.mockapp_id) cls.murano_client.packages.delete(cls.postgre_id) class ImageTestCase(PackageBase): @classmethod def setUpClass(cls): super(ImageTestCase, cls).setUpClass() glance_endpoint = cls.keystone_client.service_catalog.url_for( service_type='image', endpoint_type='publicURL') cls.glance = gclient.Client('1', endpoint=glance_endpoint, token=cls.keystone_client.auth_token) cls.image_title = 'New Image ' + str(time.time()) cls.image = cls.upload_image(cls.image_title) @classmethod def tearDownClass(cls): super(ImageTestCase, cls).tearDownClass() cls.glance.images.delete(cls.image.id) @classmethod def upload_image(cls, title): try: property = {'murano_image_info': json.dumps({'title': title, 'type': 'linux'})} image = cls.glance.images.create(name='TestImage', disk_format='qcow2', size=0, is_public=True, properties=property) except Exception as e: log.exception("Unable to create or update image in Glance") raise e return image def select_and_click_element(self, element): self.driver.find_element_by_xpath( ".//*[@value = '{0}']".format(element)).click() def repair_image(self): self.driver.find_element_by_id( 'marked_images__action_mark_image').click() self.select_from_list('image', self.image.id) self.fill_field(by.By.ID, 'id_title', self.image_title) self.select_from_list('type', 'linux') self.select_and_click_element('Mark Image') self.check_element_on_page(by.By.XPATH, consts.TestImage.format(self.image_title)) class FieldsTestCase(PackageBase): def check_error_message_is_present(self, error_message): self.driver.find_element_by_xpath(consts.ButtonSubmit).click() self.driver.find_element_by_xpath( consts.ErrorMessage.format(error_message)) def check_error_message_is_absent(self, error_message): self.driver.find_element_by_xpath(consts.ButtonSubmit).click() self.driver.implicitly_wait(2) try: self.driver.find_element_by_xpath( consts.ErrorMessage.format(error_message)) except (exc.NoSuchElementException, exc.ElementNotVisibleException): log.info("Message {0} is not" " present on the page".format(error_message)) self.driver.implicitly_wait(30) class ApplicationTestCase(ImageTestCase): def delete_component(self, component_name): component_id = self.get_element_id(component_name) self.driver.find_element_by_id( 'services__row_{0}__action_delete'.format(component_id)).click() el = self.wait_element_is_clickable(by.By.LINK_TEXT, 'Delete Component') el.click() self.wait_for_alert_message() def select_action_for_package(self, package, action): package_id = self.get_element_id(package) if action == 'more': el = self.wait_element_is_clickable( by.By.XPATH, "//tr[@data-object-id='{0}']" "//a[@data-toggle='dropdown']".format(package_id)) el.click() ui.WebDriverWait(self.driver, 10).until(lambda s: s.find_element( by.By.XPATH, ".//*[@id='packages__row_{0}__action_download_package']". format(package_id)).is_displayed()) else: self.driver.find_element_by_xpath( ".//*[@id='packages__row_{0}__action_{1}']". format(package_id, action)).click() def check_package_parameter(self, package, column, value): columns = {'Active': 3, 'Public': 4} package_id = self.get_element_id(package) column_num = str(columns[column]) column_element = self.driver.find_element_by_xpath( "//tr[@data-object-id='{0}']/td[{1}]".format(package_id, column_num)) self.assertTrue(column_element.text == value, "'{0}' column doesn't contain '{1}'".format(column, value)) def modify_package(self, param, value): self.fill_field(by.By.ID, 'id_{0}'.format(param), value) self.driver.find_element_by_xpath(consts.InputSubmit).click() self.wait_for_alert_message() def add_app_to_env(self, app_id, app_name='TestApp'): self.go_to_submenu('Applications') self.select_and_click_action_for_app('quick-add', app_id) field_id = "{0}_0-name".format(app_id) self.fill_field(by.By.ID, field_id, value=app_name) self.driver.find_element_by_xpath(consts.ButtonSubmit).click() self.driver.find_element_by_xpath(consts.InputSubmit).click() self.select_from_list('osImage', self.image.id) self.driver.find_element_by_xpath(consts.InputSubmit).click() self.wait_for_alert_message() class PackageTestCase(ApplicationTestCase): @classmethod def setUpClass(cls): super(ApplicationTestCase, cls).setUpClass() cls.archive_name = "ToUpload" cls.alt_archive_name = "ModifiedAfterUpload" cls.archive = utils.compose_package(cls.archive_name, consts.Manifest, consts.PackageDir) def tearDown(self): super(PackageTestCase, self).tearDown() for package in self.murano_client.packages.list(include_disabled=True): if package.name in [self.archive_name, self.alt_archive_name]: self.murano_client.packages.delete(package.id) @classmethod def tearDownClass(cls): super(ApplicationTestCase, cls).tearDownClass() if os.path.exists(consts.Manifest): os.remove(consts.Manifest) if os.path.exists(cls.archive): os.remove(cls.archive)