#!/usr/bin/python3 # # SPDX-License-Identifier: Apache-2.0 # import os import subprocess import re import getpass import time from sys import platform from consts import env from utils.install_log import LOG def vboxmanage_version(): """ Return version of vbox. """ version = subprocess.check_output(['vboxmanage', '--version'], stderr=subprocess.STDOUT) return version def vboxmanage_extpack(action="install"): """ This allows you to install, uninstall the vbox extensions" """ output = vboxmanage_version() version = re.match(b'(.*)r', output) version_path = version.group(1).decode('utf-8') LOG.info("Downloading extension pack") filename = 'Oracle_VM_VirtualBox_Extension_Pack-{}.vbox-extpack'.format(version_path) cmd = 'http://download.virtualbox.org/virtualbox/{}/{}'.format(version_path, filename) result = subprocess.check_output(['wget', cmd, '-P', '/tmp'], stderr=subprocess.STDOUT) LOG.info(result) LOG.info("Installing extension pack") result = subprocess.check_output(['vboxmanage', 'extpack', 'install', '/tmp/' + filename, '--replace'], stderr=subprocess.STDOUT) LOG.info(result) def get_all_vms(labname, option="vms"): initial_node_list = [] vm_list = vboxmanage_list(option) labname.encode('utf-8') # Reduce the number of VMs we query for item in vm_list: if labname.encode('utf-8') in item and (b'controller-' in item or \ b'compute-' in item or b'storage-' in item): initial_node_list.append(item.decode('utf-8')) # Filter by group node_list = [] group = bytearray('"/{}"'.format(labname), 'utf-8') for item in initial_node_list: info = vboxmanage_showinfo(item).splitlines() for line in info: try: k, v = line.split(b'=') except ValueError: continue if k == b'groups' and v == group: node_list.append(item) return node_list def take_snapshot(labname, snapshot_name, socks=None): vms = get_all_vms(labname, option="vms") runningvms = get_all_vms(labname, option="runningvms") LOG.info("#### Taking snapshot %s of lab %s", snapshot_name, labname) LOG.info("VMs in lab %s: %s", labname, vms) LOG.info("VMs running in lab %s: %s", labname, runningvms) hosts = len(vms) # Pause running VMs to take snapshot if len(runningvms) > 1: for node in runningvms: newpid = os.fork() if newpid == 0: vboxmanage_controlvms([node], "pause") os._exit(0) for node in vms: os.waitpid(0, 0) time.sleep(2) if hosts != 0: vboxmanage_takesnapshot(vms, snapshot_name) # Resume VMs after snapshot was taken if len(runningvms) > 1: for node in runningvms: newpid = os.fork() if newpid == 0: vboxmanage_controlvms([node], "resume") os._exit(0) for node in runningvms: os.waitpid(0, 0) time.sleep(10) # Wait for VM serial port to stabilize, otherwise it may refuse to connect if runningvms: new_vms = get_all_vms(labname, option="runningvms") retry = 0 while retry < 20: LOG.info("Waiting for VMs to come up running after taking snapshot..." "Up VMs are %s ", new_vms) if len(runningvms) < len(new_vms): time.sleep(1) new_vms = get_all_vms(labname, option="runningvms") retry += 1 else: LOG.info("All VMs %s are up running after taking snapshot...", vms) break def restore_snapshot(node_list, name): LOG.info("Restore snapshot of %s for hosts %s", name, node_list) if len(node_list) != 0: vboxmanage_controlvms(node_list, "poweroff") time.sleep(5) if len(node_list) != 0: for host in node_list: vboxmanage_restoresnapshot(host, name) time.sleep(5) for host in node_list: if "controller-0" not in host: vboxmanage_startvm(host) time.sleep(10) for host in node_list: if "controller-0" in host: vboxmanage_startvm(host) time.sleep(10) def vboxmanage_list(option="vms"): """ This returns a list of vm names. """ result = subprocess.check_output(['vboxmanage', 'list', option], stderr=subprocess.STDOUT) vms_list = [] for item in result.splitlines(): vm_name = re.match(b'"(.*?)"', item) vms_list.append(vm_name.group(1)) return vms_list def vboxmanage_showinfo(host): """ This returns info about the host """ if not isinstance(host, str): host.decode('utf-8') result = subprocess.check_output(['vboxmanage', 'showvminfo', host, '--machinereadable'], stderr=subprocess.STDOUT) return result def vboxmanage_createvm(hostname, labname): """ This creates a VM with the specified name. """ assert hostname, "Hostname is required" assert labname, "Labname is required" group = "/" + labname LOG.info("Creating VM %s", hostname) result = subprocess.check_output(['vboxmanage', 'createvm', '--name', hostname, '--register', '--ostype', 'Linux_64', '--groups', group], stderr=subprocess.STDOUT) def vboxmanage_deletevms(hosts=None): """ Deletes a list of VMs """ assert hosts, "A list of hostname(s) is required" if len(hosts) != 0: for hostname in hosts: LOG.info("Deleting VM %s", hostname) result = subprocess.check_output(['vboxmanage', 'unregistervm', hostname, '--delete'], stderr=subprocess.STDOUT) time.sleep(10) # in case medium is still present after delete vboxmanage_deletemedium(hostname) vms_list = vboxmanage_list("vms") for items in hosts: assert items not in vms_list, "The following vms are unexpectedly" \ "present {}".format(vms_list) def vboxmanage_hostonlyifcreate(name="vboxnet0", ip=None, netmask=None): """ This creates a hostonly network for systems to communicate. """ assert name, "Must provide network name" assert ip, "Must provide an OAM IP" assert netmask, "Must provide an OAM Netmask" LOG.info("Creating Host-only Network") result = subprocess.check_output(['vboxmanage', 'hostonlyif', 'create'], stderr=subprocess.STDOUT) LOG.info("Provisioning %s with IP %s and Netmask %s", name, ip, netmask) result = subprocess.check_output(['vboxmanage', 'hostonlyif', 'ipconfig', name, '--ip', ip, '--netmask', netmask], stderr=subprocess.STDOUT) def vboxmanage_hostonlyifdelete(name="vboxnet0"): """ Deletes hostonly network. This is used as a work around for creating too many hostonlyifs. """ assert name, "Must provide network name" LOG.info("Removing Host-only Network") result = subprocess.check_output(['vboxmanage', 'hostonlyif', 'remove', name], stderr=subprocess.STDOUT) def vboxmanage_modifyvm(hostname=None, cpus=None, memory=None, nic=None, nictype=None, nicpromisc=None, nicnum=None, intnet=None, hostonlyadapter=None, natnetwork=None, uartbase=None, uartport=None, uartmode=None, uartpath=None, nicbootprio2=1, prefix=""): """ This modifies a VM with a specified name. """ assert hostname, "Hostname is required" # Add more semantic checks cmd = ['vboxmanage', 'modifyvm', hostname] if cpus: cmd.extend(['--cpus', cpus]) if memory: cmd.extend(['--memory', memory]) if nic and nictype and nicpromisc and nicnum: cmd.extend(['--nic{}'.format(nicnum), nic]) cmd.extend(['--nictype{}'.format(nicnum), nictype]) cmd.extend(['--nicpromisc{}'.format(nicnum), nicpromisc]) if intnet: if prefix: intnet = "{}-{}".format(prefix, intnet) else: intnet = "{}".format(intnet) cmd.extend(['--intnet{}'.format(nicnum), intnet]) if hostonlyadapter: cmd.extend(['--hostonlyadapter{}'.format(nicnum), hostonlyadapter]) if natnetwork: cmd.extend(['--nat-network{}'.format(nicnum), natnetwork]) elif nicnum and nictype == 'nat': cmd.extend(['--nic{}'.format(nicnum), 'nat']) if uartbase and uartport and uartmode and uartpath: cmd.extend(['--uart1']) cmd.extend(['{}'.format(uartbase)]) cmd.extend(['{}'.format(uartport)]) cmd.extend(['--uartmode1']) cmd.extend(['{}'.format(uartmode)]) if platform == 'win32' or platform == 'win64': cmd.extend(['{}'.format(env.PORT)]) env.PORT += 1 else: if prefix: prefix = "{}_".format(prefix) if 'controller-0' in hostname: cmd.extend(['{}{}{}_serial'.format(uartpath, prefix, hostname)]) else: cmd.extend(['{}{}{}'.format(uartpath, prefix, hostname)]) if nicbootprio2: cmd.extend(['--nicbootprio2']) cmd.extend(['{}'.format(nicbootprio2)]) cmd.extend(['--boot4']) cmd.extend(['net']) LOG.info(cmd) LOG.info("Updating VM %s configuration", hostname) result = subprocess.check_output(cmd, stderr=subprocess.STDOUT) def vboxmanage_port_forward(hostname, network, local_port, guest_port, guest_ip): # VBoxManage natnetwork modify --netname natnet1 --port-forward-4 # "ssh:tcp:[]:1022:[192.168.15.5]:22" rule_name = "{}-{}".format(hostname, guest_port) # Delete previous entry, if any LOG.info("Removing previous forwarding rule '%s' from NAT network '%s'", rule_name, network) cmd = ['vboxmanage', 'natnetwork', 'modify', '--netname', network, '--port-forward-4', 'delete', rule_name] try: result = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError: pass # Add new rule rule = "{}:tcp:[]:{}:[{}]:{}".format(rule_name, local_port, guest_ip, guest_port) LOG.info("Updating port-forwarding rule to: %s", rule) cmd = ['vboxmanage', 'natnetwork', 'modify', '--netname', network, '--port-forward-4', rule] result = subprocess.check_output(cmd, stderr=subprocess.STDOUT) def vboxmanage_storagectl(hostname=None, storectl="sata", hostiocache="off"): """ This creates a storage controller on the host. """ assert hostname, "Hostname is required" assert storectl, "Type of storage controller is required" LOG.info("Creating %s storage controller on VM %s", storectl, hostname) result = subprocess.check_output(['vboxmanage', 'storagectl', hostname, '--name', storectl, '--add', storectl, '--hostiocache', hostiocache], stderr=subprocess.STDOUT) def vboxmanage_storageattach(hostname=None, storectl="sata", storetype="hdd", disk=None, port_num="0", device_num="0"): """ This attaches a disk to a controller. """ assert hostname, "Hostname is required" assert disk, "Disk name is required" assert storectl, "Name of storage controller is required" assert storetype, "Type of storage controller is required" LOG.info("Attaching %s storage to storage controller %s on VM %s", storetype, storectl, hostname) result = subprocess.check_output(['vboxmanage', 'storageattach', hostname, '--storagectl', storectl, '--medium', disk, '--type', storetype, '--port', port_num, '--device', device_num], stderr=subprocess.STDOUT) return result def vboxmanage_deletemedium(hostname, vbox_home_dir='/home'): assert hostname, "Hostname is required" if platform == 'win32' or platform == 'win64': return username = getpass.getuser() vbox_home_dir = "{}/{}/vbox_disks/".format(vbox_home_dir, username) disk_list = [f for f in os.listdir(vbox_home_dir) if os.path.isfile(os.path.join(vbox_home_dir, f)) and hostname in f] LOG.info("Disk mediums to delete: %s", disk_list) for disk in disk_list: LOG.info("Disconnecting disk %s from vbox.", disk) try: result = subprocess.check_output(['vboxmanage', 'closemedium', 'disk', "{}{}".format(vbox_home_dir, disk), '--delete'], stderr=subprocess.STDOUT) LOG.info(result) except subprocess.CalledProcessError as e: # Continue if failures, disk may not be present LOG.info("Error disconnecting disk, continuing. " "Details: stdout: %s stderr: %s", e.stdout, e.stderr) LOG.info("Removing backing file %s", disk) try: os.remove("{}{}".format(vbox_home_dir, disk)) except: pass def vboxmanage_createmedium(hostname=None, disk_list=None, vbox_home_dir='/home'): """ This creates the required disks. """ assert hostname, "Hostname is required" assert disk_list, "A list of disk sizes is required" username = getpass.getuser() device_num = 0 port_num = 0 disk_count = 1 for disk in disk_list: if platform == 'win32' or platform == 'win64': file_name = "C:\\Users\\" + username + "\\vbox_disks\\" + \ hostname + "_disk_{}".format(disk_count) else: file_name = vbox_home_dir + '/' + username + "/vbox_disks/" \ + hostname + "_disk_{}".format(disk_count) LOG.info("Creating disk %s of size %s on VM %s on device %s port %s", file_name, disk, hostname, device_num, port_num) try: result = subprocess.check_output(['vboxmanage', 'createmedium', 'disk', '--size', str(disk), '--filename', file_name, '--format', 'vdi', '--variant', 'standard'], stderr=subprocess.STDOUT) LOG.info(result) except subprocess.CalledProcessError as e: LOG.info("Error stdout: %s stderr: %s", e.stdout, e.stderr) raise vboxmanage_storageattach(hostname, "sata", "hdd", file_name + \ ".vdi", str(port_num), str(device_num)) disk_count += 1 port_num += 1 time.sleep(5) def vboxmanage_startvm(hostname=None, force=False): """ This allows you to power on a VM. """ assert hostname, "Hostname is required" if not force: LOG.info("Check if VM is running") running_vms = vboxmanage_list(option="runningvms") else: running_vms = [] if hostname.encode('utf-8') in running_vms: LOG.info("Host %s is already started", hostname) else: LOG.info("Powering on VM %s", hostname) result = subprocess.check_output(['vboxmanage', 'startvm', hostname], stderr=subprocess.STDOUT) LOG.info(result) # Wait for VM to start tmout = 20 while tmout: tmout -= 1 running_vms = vboxmanage_list(option="runningvms") if hostname.encode('utf-8') in running_vms: break time.sleep(1) else: raise "Failed to start VM: {}".format(hostname) LOG.info("VM '%s' started.", hostname) def vboxmanage_controlvms(hosts=None, action=None): """ This allows you to control a VM, e.g. pause, resume, etc. """ assert hosts, "Hostname is required" assert action, "Need to provide an action to execute" for host in hosts: LOG.info("Executing %s action on VM %s", action, host) result = subprocess.call(["vboxmanage", "controlvm", host, action], stderr=subprocess.STDOUT) time.sleep(1) def vboxmanage_takesnapshot(hosts=None, name=None): """ This allows you to take snapshot of VMs. """ assert hosts, "Hostname is required" assert name, "Need to provide a name for the snapshot" for host in hosts: LOG.info("Taking snapshot %s on VM %s", name, host) result = subprocess.call(["vboxmanage", "snapshot", host, "take", name], stderr=subprocess.STDOUT) def vboxmanage_restoresnapshot(host=None, name=None): """ This allows you to restore snapshot of a VM. """ assert host, "Hostname is required" assert name, "Need to provide the snapshot to restore" LOG.info("Restoring snapshot %s on VM %s", name, host) result = subprocess.call(["vboxmanage", "snapshot", host, "restore", name], stderr=subprocess.STDOUT) time.sleep(10)