import six import time import socket from os import path from rally.common import logging from rally.common import sshutils from rally import consts from rally import exceptions from rally.plugins.openstack import scenario from rally.plugins.openstack.scenarios.vm import utils as vm_utils from rally.plugins.openstack.scenarios.cinder import utils as cinder_utils from rally.task import atomic from rally.task import types from rally.task import validation from rally.task import utils as task_utils LOG = logging.getLogger(__name__) def failover(self, host, command, port=22, username="", password="", key_filename=None, pkey=None): """Trigger failover at host :param host: :param command: :return: """ if key_filename: key_filename = path.expanduser(key_filename) LOG.info("Host: %s. Injecting Failover %s" % (host, command)) try: code, out, err = _run_command(self, server_ip=host, port=port, username=username, password=password, key_filename=key_filename, pkey=pkey, command=command ) if code and code > 0: raise exceptions.ScriptError( "Error running command %(command)s. " "Error %(code)s: %(error)s" % { "command": command, "code": code, "error": err}) except exceptions.SSHTimeout: LOG.debug("SSH session of disruptor command timeouted, continue...") pass def _run_command(self, server_ip, port, username, password, command, pkey=None, key_filename=None): """Run command via SSH on server. Create SSH connection for server, wait for server to become available (there is a delay between server being set to ACTIVE and sshd being available). Then call run_command_over_ssh to actually execute the command. Note: Shadows vm.utils.VMScenario._run_command to support key_filename. :param server_ip: server ip address :param port: ssh port for SSH connection :param username: str. ssh username for server :param password: Password for SSH authentication :param command: Dictionary specifying command to execute. See `rally info find VMTasks.boot_runcommand_delete' parameter `command' docstring for explanation. :param key_filename: private key filename for SSH authentication :param pkey: key for SSH authentication :returns: tuple (exit_status, stdout, stderr) """ if not key_filename: pkey = pkey or self.context["user"]["keypair"]["private"] ssh = sshutils.SSH(username, server_ip, port=port, pkey=pkey, password=password, key_filename=key_filename) self._wait_for_ssh(ssh) return _run_command_over_ssh(self, ssh, command) @atomic.action_timer("vm.run_command_over_ssh") def _run_command_over_ssh(self, ssh, command): """Run command inside an instance. This is a separate function so that only script execution is timed. :param ssh: A SSHClient instance. :param command: Dictionary specifying command to execute. See `rally info find VMTasks.boot_runcommand_delete' parameter `command' docstring for explanation. :returns: tuple (exit_status, stdout, stderr) """ cmd, stdin = [], None interpreter = command.get("interpreter") or [] if interpreter: if isinstance(interpreter, six.string_types): interpreter = [interpreter] elif type(interpreter) != list: raise ValueError("command 'interpreter' value must be str " "or list type") cmd.extend(interpreter) remote_path = command.get("remote_path") or [] if remote_path: if isinstance(remote_path, six.string_types): remote_path = [remote_path] elif type(remote_path) != list: raise ValueError("command 'remote_path' value must be str " "or list type") cmd.extend(remote_path) if command.get("local_path"): ssh.put_file(os.path.expanduser( command["local_path"]), remote_path[-1], mode=self.USER_RWX_OTHERS_RX_ACCESS_MODE) if command.get("script_file"): stdin = open(os.path.expanduser(command["script_file"]), "rb") elif command.get("script_inline"): stdin = six.moves.StringIO(command["script_inline"]) cmd.extend(command.get("command_args") or []) return ssh.execute(cmd, stdin=stdin, timeout=10) def one_killing_iteration(self, server, fip, computes, disruptor_cmd, stop_instance): """Find the host where instance is hosted, disrupt the host and verify status of the instance after the failover""" server_admin = self.admin_clients("nova").servers.get(server.id) host_name_pre = getattr(server_admin, "OS-EXT-SRV-ATTR:host") host_name_ext = host_name_pre.split('.')[0] + ".external" hypervisors = self.admin_clients("nova").hypervisors.list() hostnames = [] for hypervisor in hypervisors: hostnames.append(getattr(hypervisor, "hypervisor_hostname")) if getattr(hypervisor, "hypervisor_hostname") == host_name_pre: hypervisor_id = getattr(hypervisor, "id") hypervisor = self.admin_clients("nova").hypervisors.get(hypervisor_id) hypervisor_ip = socket.gethostbyname(host_name_ext.strip()) if not disruptor_cmd: disruptor_cmd = { "script_inline": "sudo sh -c \"echo b > /proc/sysrq-trigger\"", "interpreter": "/bin/sh" } # Trigger failover of compute node hosting the instance failover(self, host=hypervisor_ip, command=disruptor_cmd, port=computes.get("port", 22), username=computes.get("username"), password=computes.get("password"), key_filename=computes.get("key_filename"), pkey=computes.get("pkey") ) # Wait for instance to be moved to different host hostnames.remove(host_name_pre) task_utils.wait_for( server_admin, status_attr="OS-EXT-SRV-ATTR:host", ready_statuses=hostnames, update_resource=task_utils.get_from_manager(), timeout=120, check_interval=5 ) # Check the instance is SHUTOFF in the case of stopped instance or # that the instance is pingable if stop_instance: task_utils.wait_for( server, ready_statuses=["SHUTOFF"], update_resource=task_utils.get_from_manager(), timeout=60, check_interval=2 ) #server_admin = self.admin_clients("nova").servers.get(server.id) #host_name_post = getattr(server_admin, "OS-EXT-SRV-ATTR:host") #if host_name_post in host_name_pre: #raise exceptions.InvalidHostException() else: try: if self.wait_for_ping: self._wait_for_ping(fip["ip"]) except exceptions.TimeoutException: console_logs = self._get_server_console_output(server, None) LOG.debug("VM console logs:\n%s", console_logs) raise def recover_instance_ha(self, image, flavor, computes, volume_args=None, floating_network=None, use_floating_ip=True, force_delete=False, stop_instance=False, disruptor_cmd=None, iterations=1, wait_for_ping=True, max_log_length=None, **kwargs): """Boot a server, trigger failover of host and verify instance. :param image: glance image name to use for the vm :param flavor: VM flavor name :param computes: dictionary with credentials to the compute nodes consisting of username, password, port, key_filename, disruptor command and pkey. Examples:: computes: { username: heat-admin, key_filename: /path/to/ssh/id_rsa.pub port: 22 } :param volume_args: volume args for booting server from volume :param floating_network: external network name, for floating ip :param use_floating_ip: bool, floating or fixed IP for SSH connection :param force_delete: whether to use force_delete for servers :param stop_instance: whether to stop instance before disruptor command :param disruptor_cmd: command to be send to hosting compute node :param iterations: number of compute node killing iteration :param wait_for_ping: whether to check connectivity on server creation :param **kwargs: extra arguments for booting the server :param max_log_length: The number of tail nova console-log lines user would like to retrieve :returns: """ self.wait_for_ping = wait_for_ping if volume_args: volume = self.cinder.create_volume(volume_args["size"], imageRef=None) kwargs["block_device_mapping"] = {"vdrally": "%s:::1" % volume.id} server, fip = self._boot_server_with_fip( image, flavor, use_floating_ip=use_floating_ip, floating_network=floating_network, key_name=self.context["user"]["keypair"]["name"], **kwargs) task_utils.wait_for( server, ready_statuses=["ACTIVE"], update_resource=task_utils.get_from_manager(), timeout=120, check_interval=2 ) try: if self.wait_for_ping: self._wait_for_ping(fip["ip"]) except exceptions.TimeoutException: console_logs = self._get_server_console_output(server, max_log_length) LOG.debug("VM console logs:\n%s", console_logs) raise if stop_instance: self._stop_server(server) task_utils.wait_for( server, ready_statuses=["SHUTOFF"], update_resource=task_utils.get_from_manager(), timeout=120, check_interval=2 ) # Wait a little before killing the compute # If we do not wait, backing image will get corrupted which was reported as bug time.sleep(30) for iteration in range(1, iterations+1): one_killing_iteration(self, server, fip, computes, disruptor_cmd, stop_instance) # Give cluster some time to recover original compute node LOG.info("Wait for compute nodes to come online after previous disruption") time.sleep(360) if stop_instance: # Start instance If It was stopped. self._start_server(server) task_utils.wait_for( server, ready_statuses=["ACTIVE"], update_resource=task_utils.get_from_manager(), timeout=120, check_interval=2 ) self._delete_server_with_fip(server, fip, force_delete=force_delete) @types.convert(image={"type": "glance_image"}, flavor={"type": "nova_flavor"}) @validation.image_valid_on_flavor("flavor", "image") @validation.valid_command("command", required=False) @validation.number("port", minval=1, maxval=65535, nullable=True, integer_only=True) @validation.external_network_exists("floating_network") @validation.required_services(consts.Service.NOVA, consts.Service.CINDER) @validation.required_openstack(users=True, admin=True) @scenario.configure(context={"cleanup@openstack": ["nova", "cinder"], "keypair@openstack": {}, "allow_ssh@openstack": None}, name="InstanceHA.recover_instance_fip_and_volume", platform="openstack") class InstanceHARecoverFIPAndVolume(vm_utils.VMScenario, cinder_utils.CinderBasic): def __init__(self, *args, **kwargs): super(InstanceHARecoverFIPAndVolume, self).__init__(*args, **kwargs) def run(self, image, flavor, computes, volume_args=None, floating_network=None, use_floating_ip=True, force_delete=False, wait_for_ping=True, max_log_length=None, **kwargs): recover_instance_ha(self, image, flavor, computes, volume_args=volume_args, floating_network=floating_network, use_floating_ip=use_floating_ip, force_delete=force_delete, wait_for_ping=wait_for_ping, max_log_length=max_log_length, **kwargs) @types.convert(image={"type": "glance_image"}, flavor={"type": "nova_flavor"}) @validation.image_valid_on_flavor("flavor", "image") @validation.valid_command("command", required=False) @validation.number("port", minval=1, maxval=65535, nullable=True, integer_only=True) @validation.external_network_exists("floating_network") @validation.required_services(consts.Service.NOVA, consts.Service.CINDER) @validation.required_openstack(users=True, admin=True) @scenario.configure(context={"cleanup@openstack": ["nova", "cinder"], "keypair@openstack": {}, "allow_ssh@openstack": None}, name="InstanceHA.recover_instance_two_cycles", platform="openstack") class InstanceHARecoverTwoCycle(vm_utils.VMScenario, cinder_utils.CinderBasic): def __init__(self, *args, **kwargs): super(InstanceHARecoverTwoCycle, self).__init__(*args, **kwargs) def run(self, image, flavor, computes, volume_args=None, floating_network=None, use_floating_ip=True, force_delete=False, wait_for_ping=True, max_log_length=None, **kwargs): recover_instance_ha(self, image, flavor, computes, volume_args=volume_args, floating_network=floating_network, use_floating_ip=use_floating_ip, force_delete=force_delete, iterations=2, wait_for_ping=wait_for_ping, max_log_length=max_log_length, **kwargs) @types.convert(image={"type": "glance_image"}, flavor={"type": "nova_flavor"}) @validation.image_valid_on_flavor("flavor", "image") @validation.valid_command("command", required=False) @validation.number("port", minval=1, maxval=65535, nullable=True, integer_only=True) @validation.external_network_exists("floating_network") @validation.required_services(consts.Service.NOVA, consts.Service.CINDER) @validation.required_openstack(users=True, admin=True) @scenario.configure(context={"cleanup@openstack": ["nova", "cinder"], "keypair@openstack": {}, "allow_ssh@openstack": None}, name="InstanceHA.recover_stopped_instance_fip", platform="openstack") class InstanceHARecoverStopped(vm_utils.VMScenario, cinder_utils.CinderBasic): def __init__(self, *args, **kwargs): super(InstanceHARecoverStopped, self).__init__(*args, **kwargs) def run(self, image, flavor, computes, volume_args=None, floating_network=None, use_floating_ip=True, force_delete=False, wait_for_ping=True, max_log_length=None, **kwargs): recover_instance_ha(self, image, flavor, computes, volume_args=volume_args, floating_network=floating_network, use_floating_ip=use_floating_ip, force_delete=force_delete, stop_instance=True, wait_for_ping=wait_for_ping, max_log_length=max_log_length, **kwargs) @types.convert(image={"type": "glance_image"}, flavor={"type": "nova_flavor"}) @validation.image_valid_on_flavor("flavor", "image") @validation.valid_command("command", required=False) @validation.number("port", minval=1, maxval=65535, nullable=True, integer_only=True) @validation.external_network_exists("floating_network") @validation.required_services(consts.Service.NOVA, consts.Service.CINDER) @validation.required_openstack(users=True, admin=True) @scenario.configure(context={"cleanup@openstack": ["nova", "cinder"], "keypair@openstack": {}, "allow_ssh@openstack": None}, name="InstanceHA.recover_instance_nova_compute", platform="openstack") class InstanceHARecoverNovaCompute(vm_utils.VMScenario, cinder_utils.CinderBasic): def __init__(self, *args, **kwargs): super(InstanceHARecoverNovaCompute, self).__init__(*args, **kwargs) def run(self, image, flavor, computes, volume_args=None, floating_network=None, use_floating_ip=True, force_delete=False, wait_for_ping=True, max_log_length=None, **kwargs): disruptor_cmd = { "script_inline": "sudo kill -9 $(ps -ef | grep ^nova* | awk \'{print$2}\'); echo {}", "interpreter": "/bin/sh" } recover_instance_ha(self, image, flavor, computes, volume_args=volume_args, floating_network=floating_network, use_floating_ip=use_floating_ip, force_delete=force_delete, disruptor_cmd=disruptor_cmd, wait_for_ping=wait_for_ping, max_log_length=max_log_length, **kwargs)