# Copyright 2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import re import time import urllib2 from devops.helpers import helpers from fuelweb_test.helpers import os_actions from fuelweb_test.helpers.ssh_manager import SSHManager from fuelweb_test import logger from proboscis import asserts from devops.helpers.helpers import wait from murano_plugin_tests.helpers import remote_ops from murano_plugin_tests import settings PLUGIN_PACKAGE_RE = re.compile(r'([^/]+)-(\d+\.\d+)-(\d+\.\d+\.\d+)') class NotFound(Exception): pass class TimeoutException(Exception): pass def get_plugin_name(filename): """Extract the plugin name from the package filename. :param filename: the plugin's filename. :type filename: str :returns: the plugin's name or None if not found :rtype: str """ m = PLUGIN_PACKAGE_RE.search(filename or '') if m: return m.group(1) else: return None def get_plugin_version(filename): """Extract the plugin version from the package filename. :param filename: the plugin's filename. :type filename: str :returns: the plugin's version or None if not found :rtype: str """ m = PLUGIN_PACKAGE_RE.search(filename or '') if m: return m.group(3) else: return None def get_fixture(name): """Return the full path to a fixture.""" path = os.path.join(os.environ.get("WORKSPACE", "./"), "fixtures", name) if not os.path.isfile(path): raise NotFound("File {} not found".format(path)) return path class PluginHelper(object): """Class for common help functions.""" def __init__(self, env): self.env = env self.fuel_web = self.env.fuel_web self._cluster_id = None self.nailgun_client = self.fuel_web.client self._os_conn = None self.ssh_manager = SSHManager() @property def cluster_id(self): if self._cluster_id is None: try: self._cluster_id = self.fuel_web.get_last_created_cluster() except urllib2.URLError: raise EnvironmentError("No cluster was created.") return self._cluster_id @cluster_id.setter def cluster_id(self, value): self._cluster_id = value @property def os_conn(self): if self._os_conn is None: self._os_conn = os_actions.OpenStackActions( self.fuel_web.get_public_vip(self.cluster_id)) return self._os_conn @property def ceph_settings(self): """Return a dict with ceph-related settings for the cluster""" return { 'volumes_lvm': False, 'volumes_ceph': True, 'images_ceph': True, 'objects_ceph': True, 'ephemeral_ceph': True, 'osd_pool_size': "1" } def prepare_plugin(self, plugin_path): """Upload and install plugin by path.""" self.env.admin_actions.upload_plugin(plugin=plugin_path) self.env.admin_actions.install_plugin( plugin_file_name=os.path.basename(plugin_path)) def activate_plugin(self, name, version, options=None, strict=False): """Enable and configure a plugin for the cluster. :param name: name of the plugin. :type name: str :param version: version of the plugin. :type name: str :param options: configuration of the plugin (optional). :type options: dict :param strict: whether or not to fail when setting an unknown option (default: False). :type options: boolean :returns: None """ if options is None: options = {} msg = "Plugin {0} isn't found.".format(name) asserts.assert_true( self.fuel_web.check_plugin_exists(self.cluster_id, name), msg) logger.info("Updating settings for plugin {0} ({1}): {2}".format( name, version, options)) attributes = self.nailgun_client.get_cluster_attributes( self.cluster_id) attributes = attributes['editable'][name] logger.info("Plugin attrs: {0}".format(attributes)) plugin_data = None for item in attributes['metadata']['versions']: if item['metadata']['plugin_version'] == version: plugin_data = item break asserts.assert_is_not_none( plugin_data, "Plugin {0} ({1}) is not found".format(name, version)) attributes['metadata']['enabled'] = True for option, value in options.items(): path = option.split("/") for p in path[:-1]: if p in plugin_data: plugin_option = plugin_data[p] else: msg = "Plugin option {} not found".format(option) if strict: raise NotFound(msg) logger.warn(msg) plugin_option = None break if plugin_option is not None: plugin_option[path[-1]] = value self.nailgun_client.update_cluster_attributes(self.cluster_id, { "editable": {name: attributes} }) def get_all_ready_nodes(self): return [node for node in self.nailgun_client.list_cluster_nodes(self.cluster_id) if node["status"] == "ready"] def create_cluster(self, name=None, opts=None, ssl=False): """Create a cluster. :param name: name of the cluster. :type name: str :param opts: optional dict containing the cluster's configuration. :type opts: dict :param ssl: parameter, that shows, use SSL or not. :type ssl: bool :returns: the cluster's id :rtype: str """ if not name: name = self.__class__.__name__ if not opts: opts = self.ceph_settings self._cluster_id = self.env.fuel_web.create_cluster( name=name, settings=opts, mode='ha_compact', configure_ssl=ssl) return self._cluster_id def apply_changes(self, check_services=False): """Deploy changed cluster. :param check_services: run OSTF after the deployment (default: False). """ self.fuel_web.deploy_cluster_wait(self.cluster_id, check_services=check_services) def deploy_cluster(self, nodes_roles, verify_network=False, update_interfaces=True, check_services=True): """Assign roles to nodes and deploy the cluster. :param nodes_roles: nodes to roles mapping. :type nodes_roles: dict :param verify_network: whether or not network verification should be run before the deployment (default: False). :type verify_network: boolean :param update_interfaces: whether or not interfaces should be updated before the deployment (default: True). :type update_interfaces: boolean :param check_services: whether or not OSTF tests should run after the deployment (default: True). :type check_services: boolean :returns: None """ self.fuel_web.update_nodes(self.cluster_id, nodes_roles, update_interfaces=update_interfaces) if verify_network: self.fuel_web.verify_network(self.cluster_id) self.apply_changes(check_services=check_services) def run_ostf(self, *args, **kwargs): """Run the OpenStack health checks.""" self.fuel_web.run_ostf(self.cluster_id, *args, **kwargs) def run_single_ostf(self, test_sets, test_name, *args, **kwargs): """Run a subset of the OpenStack health checks.""" self.fuel_web.run_single_ostf_test(self.cluster_id, test_sets, test_name, *args, **kwargs) def add_nodes_to_cluster(self, nodes, redeploy=True, check_services=False): """Add nodes to the cluster. :param nodes: list of nodes with their roles. :type: nodes: dict :param redeploy: whether to redeploy the cluster (default: True). :type redeploy: boolean :param check_services: run OSTF after redeploy (default: False). :type check_services: boolean """ self.fuel_web.update_nodes( self.cluster_id, nodes, ) if redeploy: self.fuel_web.deploy_cluster_wait(self.cluster_id, check_services=check_services) def remove_nodes_from_cluster(self, nodes, redeploy=True, check_services=False): """Remove nodes from the cluster. :param nodes: list of nodes to remove from the cluster. :type nodes: dict :param redeploy: whether to redeploy the cluster (default: True). :type redeploy: boolean :param check_services: run OSTF after redeploy (default: False). :type check_services: boolean """ self.fuel_web.update_nodes( self.cluster_id, nodes, pending_addition=False, pending_deletion=True, ) if redeploy: self.fuel_web.deploy_cluster_wait(self.cluster_id, check_services=check_services) def get_master_node_by_role(self, role_name, excluded_nodes_fqdns=()): """Return the node running as the Designated Controller (DC). """ nodes = self.fuel_web.get_nailgun_cluster_nodes_by_roles( self.cluster_id, role_name) nodes = [node for node in nodes if node['fqdn'] not in set(excluded_nodes_fqdns)] with self.fuel_web.get_ssh_for_nailgun_node(nodes[0]) as remote: stdout = remote.check_call( 'pcs status cluster | grep "Current DC:"')["stdout"][0] for node in nodes: if node['fqdn'] in stdout: return node @staticmethod def full_vip_name(vip_name): return "".join(["vip__", vip_name]) def get_node_with_vip(self, role_name, vip, exclude_node=None): nailgun_nodes = self.fuel_web.get_nailgun_cluster_nodes_by_roles( self.cluster_id, role_name) lma_nodes = self.fuel_web.get_devops_nodes_by_nailgun_nodes( nailgun_nodes) lma_node = None if exclude_node: for node in lma_nodes: if node.name != exclude_node.name: lma_node = node break else: lma_node = lma_nodes[0] return self.fuel_web.get_pacemaker_resource_location( lma_node.name, vip)[0] def wait_for_vip_migration(self, old_master, role_name, vip, timeout=5 * 60): logger.info('Waiting for the migration of VIP {}'.format(vip)) msg = "VIP {0} has not been migrated away from {1}".format( vip, old_master) helpers.wait( lambda: old_master != self.get_node_with_vip( role_name, vip, exclude_node=old_master), timeout=timeout, timeout_msg=msg) def power_off_node(self, node): """Power off a node. :param node: Devops node. :type node: devops node instance """ msg = 'Node {0} has not become offline after hard shutdown'.format( node.name) logger.info('Power off node %s', node.name) node.destroy() logger.info('Wait a %s node offline status', node.name) helpers.wait(lambda: not self.fuel_web.get_nailgun_node_by_devops_node( node)['online'], timeout=60 * 5, timeout_msg=msg) def update_cluster_settings(self, opts): attrs = self.fuel_web.client.get_cluster_attributes(self.cluster_id) for option in opts: section = '' if option in ('sahara', 'murano', 'ceilometer', 'mongo', 'ironic'): section = 'additional_components' elif option in {'mongo_db_name', 'mongo_replset', 'mongo_user', 'hosts_ip', 'mongo_password'}: section = 'external_mongo' elif option in {'volumes_ceph', 'images_ceph', 'ephemeral_ceph', 'objects_ceph', 'osd_pool_size', 'volumes_lvm', 'volumes_block_device', 'images_vcenter'}: section = 'storage' elif option in {'tenant', 'password', 'user'}: section = 'access' elif option == 'assign_to_all_nodes': section = 'public_network_assignment' elif option in {'neutron_l3_ha', 'neutron_dvr', 'neutron_l2_pop'}: section = 'neutron_advanced_configuration' elif option in {'dns_list'}: section = 'external_dns' elif option in {'ntp_list'}: section = 'external_ntp' elif option in {'propagate_task_deploy'}: section = 'common' if section: try: attrs['editable'][section][option]['value'] = \ opts[option] except KeyError: if section not in attrs['editable']: raise KeyError( "Section '{0}' not in " "attributes['editable']: {1}".format( section, attrs['editable'].keys())) raise KeyError( "Option {0} not in attributes['editable'][{1}]: " "{2}".format( option, section, attrs['editable'][section].keys())) return self.fuel_web.client.update_cluster_attributes( self.cluster_id, attrs) def emulate_whole_network_disaster(self, delay_before_recover=5 * 60, wait_become_online=True): """Simulate a full network outage for all nodes. :param delay_before_recover: outage interval in seconds (default: 300). :type delay_before_recover: int :param wait_become_online: whether to wait for nodes to be back online. :type wait_become_online: bool """ nodes = [node for node in self.env.d_env.get_nodes() if node.driver.node_active(node)] networks_interfaces = nodes[1].interfaces for interface in networks_interfaces: interface.network.block() time.sleep(delay_before_recover) for interface in networks_interfaces: interface.network.unblock() if wait_become_online: self.fuel_web.wait_nodes_get_online_state(nodes[1:]) def uninstall_plugin(self, plugin_name, plugin_version, exit_code=0, msg=None): """Remove a plugin. :param plugin_name: plugin's name. :type plugin_name: str :param plugin_version: plugin's version. :type plugin_version: str :param exit_code: expected exit code. :type exit_code: int :param msg: message in case of error. :type msg: str """ logger.info("Trying to uninstall {name}({version}) plugin".format( name=plugin_name, version=plugin_version)) msg = msg or "Plugin {0} deletion failed: exit code is {1}" with self.env.d_env.get_admin_remote() as remote: exec_res = remote.execute("fuel plugins --remove" " {0}=={1}".format(plugin_name, plugin_version)) asserts.assert_equal( exit_code, exec_res['exit_code'], msg.format(plugin_name, exec_res['exit_code'])) def check_plugin_cannot_be_uninstalled(self, plugin_name, plugin_version): """Check that the plugin cannot be uninstalled. :param plugin_name: plugin's name. :type plugin_name: str :param plugin_version: plugin's version. :type plugin_version: str """ self.uninstall_plugin( plugin_name=plugin_name, plugin_version=plugin_version, exit_code=1, msg='{name}({version}) plugin deletion must not be allowed ' 'when it is deployed'.format(name=plugin_name, version=plugin_version)) def get_hostname_by_node_name(self, changed_node): node = self.fuel_web.get_nailgun_node_by_base_name(changed_node) if node is None: raise NotFound("Nailgun node with '{}' in name not found".format( changed_node)) return node['hostname'] def fuel_createmirror(self, option="", exit_code=0): cmd = "fuel-createmirror {0}".format(option) logger.info("Executing '{}' command.".format(cmd)) with self.env.d_env.get_admin_remote() as remote: exec_res = remote.execute(cmd) asserts.assert_equal( exit_code, exec_res['exit_code'], 'fuel-createmirror failed: {0}'.format(exec_res['stderr'])) def replace_ubuntu_mirror_with_mos(self): cmds = ["fuel-mirror create -P ubuntu -G mos", "fuel-mirror apply --replace -P ubuntu -G mos"] logger.info("Executing '{}' commands.".format('\n'.join(cmds))) with self.env.d_env.get_admin_remote() as remote: for cmd in cmds: remote.check_call(cmd) def fuel_create_repositories(self, nodes): """Start task to setup repositories on provided nodes :param nodes: list of nodes to run task on them :type nodes: list """ nodes_ids = [str(node['id']) for node in nodes] cmd = ( "fuel --env {env_id} " "node --node-id {nodes_ids} " "--tasks setup_repositories".format( env_id=self.cluster_id, nodes_ids=' '.join(nodes_ids)) ) logger.info( "Executing {cmd} command.".format(cmd=cmd)) with self.env.d_env.get_admin_remote() as remote: remote.check_call(cmd) def run_tasks(self, nodes, tasks=None, start=None, end=None, timeout=10 * 60): """Run a set of tasks on nodes and wait for completion. The list of tasks is provided using the 'tasks' parameter and it can also be specified using the 'start' and/or 'end' parameters. In the latter case, the method will compute the exact set of tasks to be executed. :param nodes: list of nodes that should run the tasks :type nodes: list :param tasks: list of tasks to run. :param tasks: list :param start: the task from where to start the deployment. :param start: str :param end: the task where to end the deployment. :param end: str :param timeout: number of seconds to wait for the tasks completion (default: 600). :param timeout: int """ task_ids = [] if tasks is not None: task_ids += tasks if start is not None or end is not None: task_ids += [ t["id"] for t in self.nailgun_client.get_end_deployment_tasks( self.cluster_id, end=end or '', start=start or '')] node_ids = ",".join([str(node["id"]) for node in nodes]) logger.info("Running tasks {0} for nodes {1}".format( ",".join(task_ids), node_ids)) result = self.nailgun_client.put_deployment_tasks_for_cluster( self.cluster_id, data=task_ids, node_id=node_ids) self.fuel_web.assert_task_success(result, timeout=timeout) def apply_maintenance_update(self): """Method applies maintenance updates on whole cluster.""" logger.info("Applying maintenance updates on master node") self.env.admin_install_updates() logger.info("Applying maintenance updates on slaves") slaves_mu_script_url = ( "https://github.com/Mirantis/tools-sustaining/" "raw/master/scripts/mos_apply_mu.py") path_to_mu_script = "/tmp/mos_apply_mu.py" with self.env.d_env.get_admin_remote() as remote: remote.check_call("wget {uri} -O {path}".format( uri=slaves_mu_script_url, path=path_to_mu_script) ) remote.check_call( "python {path} " "--env-id={identifier} " "--user={username} " "--pass={password} " "--tenant={tenant_name} --update".format( path=path_to_mu_script, identifier=self.cluster_id, **settings.KEYSTONE_CREDS ) ) controllers = self.fuel_web.get_nailgun_cluster_nodes_by_roles( self.cluster_id, roles=['controller', ]) computes = self.fuel_web.get_nailgun_cluster_nodes_by_roles( self.cluster_id, roles=['compute', ]) logger.info("Restarting all OpenStack services") logger.info("Restarting services on controllers") ha_services = ( "p_heat-engine", "p_neutron-plugin-openvswitch-agent", "p_neutron-dhcp-agent", "p_neutron-metadata-agent", "p_neutron-l3-agent") non_ha_services = ( "heat-api-cloudwatch", "heat-api-cfn", "heat-api", "cinder-api", "cinder-scheduler", "nova-objectstore", "nova-cert", "nova-api", "nova-consoleauth", "nova-conductor", "nova-scheduler", "nova-novncproxy", "neutron-server", ) for controller in controllers: with self.fuel_web.get_ssh_for_nailgun_node( controller) as remote: for service in ha_services: remote_ops.manage_pacemaker_service(remote, service) for service in non_ha_services: remote_ops.manage_initctl_service(remote, service) logger.info("Restarting services on computes") compute_services = ( "neutron-plugin-openvswitch-agent", "nova-compute", ) for compute in computes: with self.fuel_web.get_ssh_for_nailgun_node(compute) as remote: for service in compute_services: remote_ops.manage_initctl_service(remote, service) @staticmethod def check_notifications(got_list, expected_list): for event_type in expected_list: asserts.assert_true( event_type in got_list, "{} event type not found in {}".format( event_type, got_list)) @staticmethod def wait_for_resource_status(resource_client, resource, expected_status, timeout=180, interval=30): start = time.time() finish = start + timeout while start < finish: curr_state = resource_client.get(resource).status if curr_state == expected_status: return else: logger.debug( "Instance is not in {} status".format(expected_status)) time.sleep(interval) start = time.time() raise TimeoutException("Timed out waiting to become {}".format( expected_status)) def get_fuel_release(self): version = self.nailgun_client.get_api_version() return version.get('release') def check_pacemaker_resource(self, resource_name, role): """Check that the pacemaker resource is started on nodes with given role :param resource_name: the name of the pacemaker resource :type resource_name: str :param role: the role of node when pacemaker is running :type role: str :returns: None """ cluster_id = self.cluster_id n_ctrls = self.fuel_web.get_nailgun_cluster_nodes_by_roles( cluster_id, [role]) d_ctrls = self.fuel_web.get_devops_nodes_by_nailgun_nodes(n_ctrls) pcm_nodes = ' '.join(self.fuel_web.get_pcm_nodes( d_ctrls[0].name, pure=True)['Online']) logger.info("pacemaker nodes are {0}".format(pcm_nodes)) resource_nodes = self.fuel_web.get_pacemaker_resource_location( d_ctrls[0].name, resource_name) for resource_node in resource_nodes: logger.info("Check resource [{0}] on node {1}".format( resource_name, resource_node.name)) config = self.fuel_web.get_pacemaker_config(resource_node.name) asserts.assert_not_equal( re.search( "Clone Set: clone_{0} \[{0}\]\s+Started: \[ {1} \]".format( resource_name, pcm_nodes), config), None, 'Resource [{0}] is not properly configured'.format( resource_name)) def add_centos_test_proposed_repo(self, repo_url, timestamp): cmds = ["yum-config-manager --add-repo {0}{1}/x86_64/".format( repo_url, timestamp), "rpm --import {0}{1}/RPM-GPG-KEY-mos9.0".format( repo_url, timestamp)] for cmd in cmds: self.ssh_manager.check_call( ip=self.ssh_manager.admin_ip, command=cmd) def add_cluster_repo(self, repo): attributes = self.fuel_web.client.get_cluster_attributes( self.cluster_id) repos_attr = attributes['editable']['repo_setup']['repos'] repos_attr['value'].append(repo) self.fuel_web.client.update_cluster_attributes( self.cluster_id, attributes) def install_python_cudet(self): cmd = "yum install -y python-cudet" self.ssh_manager.check_call( ip=self.ssh_manager.admin_ip, command=cmd) def prepare_update_master_node(self): cmds = ['update-prepare prepare master', 'update-prepare update master'] for cmd in cmds: self.ssh_manager.check_call( ip=self.ssh_manager.admin_ip, command=cmd) def prepare_for_update(self): cmd = "update-prepare prepare env {}".format(self.cluster_id) self.ssh_manager.check_call( ip=self.ssh_manager.admin_ip, command=cmd) def install_mu(self): cmd = "fuel2 update install --env {}" \ "--restart-rabbit --restart-mysql".format(self.cluster_id) std_out = self.ssh_manager.check_call( ip=self.ssh_manager.admin_ip, command=cmd).stderr_str # "fuel2 update" command don't have json output asserts.assert_true( "fuel2 task show" in std_out, "fuel2 update command don't return task id: \n {}".format(std_out)) task_id = int(std_out.split("fuel2 task show")[1].split("`")[0]) task = self.fuel_web.client.get_task(task_id) self.assert_cli_task_success(task, timeout=120 * 60) def assert_cli_task_success(self, task, timeout=70 * 60, interval=20): logger.info('Wait {timeout} seconds for task: {task}' .format(timeout=timeout, task=task)) start = time.time() wait( lambda: (self.fuel_web.client.get_task(task['id'])['status'] not in ('pending', 'running')), interval=interval, timeout=timeout, timeout_msg='Waiting timeout {timeout} sec was reached ' 'for task: {task}'.format(task=task["name"], timeout=timeout) ) took = time.time() - start task = self.fuel_web.client.get_task(task['id']) logger.info('Task finished in {took} seconds with the result: {task}' .format(took=took, task=task)) asserts.assert_equal( task['status'], 'ready', "Task '{name}' has incorrect status. {status} != {exp}".format( status=task['status'], exp='ready', name=task["name"] ) ) def check_update(self): cmd = "cudet -e {}".format(self.cluster_id) std_out = self.ssh_manager.execute_on_remote( ip=self.ssh_manager.admin_ip, cmd=cmd)['stdout'] asserts.assert_true("ALL NODES UP-TO-DATE" in std_out[-1], "Cluster wasn't updated") def get_plugin_pid(self, service_name): controller_ip = self.fuel_web.get_nailgun_cluster_nodes_by_roles( self.cluster_id, ['murano-node'])[0]['ip'] ps_output = self.ssh_manager.execute_on_remote(ip=controller_ip, cmd='ps ax')['stdout'] api = [ps for ps in ps_output if service_name in ps] return api def compare_pid(self, old_pid, new_pid): asserts.assert_equal(old_pid, new_pid, 'PID has changed after executing' 'setup_repositories command') def add_update_repo(self): ip = self.ssh_manager.admin_ip logger.info("Adding update mirror") cmd = ("yum-config-manager --add-repo=http://mirror.fuel-infra.org/" "mos-repos/centos/mos9.0-centos7/updates/x86_64/") self.ssh_manager.execute_on_remote(ip, cmd, err_msg="Adding repo failed") logger.info("Importing GPG keys") cmd = ("rpm --import http://mirror.fuel-infra.org/mos-repos/" "centos/mos9.0-centos7/updates/RPM-GPG-KEY-mos9.0") self.ssh_manager.execute_on_remote(ip, cmd, err_msg="GPG keys import failed") logger.info("Cleaning yum cache") cmd = "yum clean all" self.ssh_manager.execute_on_remote( ip, cmd, err_msg="yum cache flush unsuccessful") def wait_os_cluster_readiness(self, timeout=15 * 60): self.fuel_web.assert_os_services_ready(self.cluster_id, timeout=timeout)