132113d247
Before creating the labs, the code tries to create the NatNetwork, if it already exists it skips, if it doesn't it creates the NatNet using the subnet defined in the localhost.yaml as CIDR. Test Plan: PASS: If NatNetwork doesn't exists, it is successfully created PASS: If NatNetwork exists with different IP for CIDR, installation is terminated. Story: 2005051 Task: 48920 Change-Id: I80e361e66c5a936da019aa55ff7aa26643819b6f Signed-off-by: Daniel Caires <daniel.caires@encora.com>
996 lines
28 KiB
Python
996 lines
28 KiB
Python
#!/usr/bin/python3
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
"""
|
|
This module provides functions for managing virtual machines using VirtualBox.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import re
|
|
import getpass
|
|
import time
|
|
|
|
from sys import platform
|
|
from utils.install_log import LOG
|
|
|
|
|
|
def vboxmanage_version():
|
|
"""
|
|
Return version of vbox.
|
|
"""
|
|
|
|
cmd = ["vboxmanage", "--version"]
|
|
version = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
return version
|
|
|
|
|
|
def vboxmanage_extpack():
|
|
"""
|
|
This allows you to install, uninstall the vbox extensions"
|
|
"""
|
|
|
|
output = vboxmanage_version()
|
|
version = re.match(b"(.*)r", output)
|
|
version_path = version.group(1).decode("utf-8")
|
|
|
|
LOG.info("Downloading extension pack")
|
|
filename = f"Oracle_VM_VirtualBox_Extension_Pack-{version_path}.vbox-extpack"
|
|
cmd = [
|
|
"wget",
|
|
f"http://download.virtualbox.org/virtualbox/{version_path}/{filename}",
|
|
"-P",
|
|
"/tmp"
|
|
]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info(result)
|
|
|
|
LOG.info("Installing extension pack")
|
|
cmd = ["vboxmanage", "extpack", "install", "/tmp/" + filename, "--replace"]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info(result)
|
|
|
|
|
|
def get_all_vms(labname, option="vms"):
|
|
"""
|
|
Return a list of virtual machines (VMs) belonging to a specified lab.
|
|
|
|
Args:
|
|
labname (str): The name of the lab to which the VMs belong.
|
|
option (str, optional): The vboxmanage command option to use when listing VMs.
|
|
Defaults to "vms".
|
|
|
|
Returns:
|
|
list: A list of strings representing the names of the VMs that belong to the specified lab.
|
|
"""
|
|
|
|
initial_node_list = []
|
|
vm_list = vboxmanage_list(option)
|
|
|
|
labname.encode("utf-8")
|
|
# Reduce the number of VMs we query
|
|
for item in vm_list:
|
|
if labname.encode("utf-8") in item and (
|
|
b"controller-" in item or b"worker-" in item or b"storage-" in item
|
|
):
|
|
initial_node_list.append(item.decode("utf-8"))
|
|
|
|
# Filter by group
|
|
node_list = []
|
|
group = bytearray(f'"/{labname}"', "utf-8")
|
|
for item in initial_node_list:
|
|
info = vboxmanage_showinfo(item).splitlines()
|
|
for line in info:
|
|
try:
|
|
k_value, v_value = line.split(b"=")
|
|
except ValueError:
|
|
continue
|
|
if k_value == b"groups" and v_value == group:
|
|
node_list.append(item)
|
|
|
|
return node_list
|
|
|
|
|
|
def take_snapshot(labname, snapshot_name):
|
|
"""
|
|
Take a snapshot of all VMs belonging to a specified lab.
|
|
|
|
Args:
|
|
labname (str): The name of the lab whose VMs will be snapshotted.
|
|
snapshot_name (str): The name of the snapshot to be taken.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
vms = get_all_vms(labname, option="vms")
|
|
runningvms = get_all_vms(labname, option="runningvms")
|
|
|
|
LOG.info("#### Taking snapshot %s of lab %s", snapshot_name, labname)
|
|
LOG.info("VMs in lab %s: %s", labname, vms)
|
|
LOG.info("VMs running in lab %s: %s", labname, runningvms)
|
|
|
|
_pause_running_vms(runningvms, vms)
|
|
|
|
if len(vms) != 0:
|
|
vboxmanage_takesnapshot(vms, snapshot_name)
|
|
|
|
_resume_running_vms(runningvms)
|
|
|
|
LOG.info("Waiting 10s before running VMs")
|
|
time.sleep(10)
|
|
|
|
if runningvms:
|
|
_wait_for_vms_to_run(labname, runningvms, vms)
|
|
|
|
|
|
def _pause_running_vms(runningvms, vms):
|
|
"""Pause running virtual machines.
|
|
|
|
Args:
|
|
runningvms (list): A list of strings representing the names of running virtual machines.
|
|
vms (list): A list of strings representing the names of all virtual machines.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
if len(runningvms) > 1:
|
|
for node in runningvms:
|
|
newpid = os.fork()
|
|
if newpid == 0:
|
|
vboxmanage_controlvms([node], "pause")
|
|
os._exit(0) # pylint: disable=protected-access
|
|
for node in vms:
|
|
os.waitpid(0, 0)
|
|
time.sleep(2)
|
|
|
|
|
|
def _resume_running_vms(runningvms):
|
|
"""Resume paused virtual machines.
|
|
|
|
Args:
|
|
runningvms (list): A list of strings representing the names of running virtual machines.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
if len(runningvms) > 1:
|
|
for node in runningvms:
|
|
newpid = os.fork()
|
|
if newpid == 0:
|
|
vboxmanage_controlvms([node], "resume")
|
|
os._exit(0) # pylint: disable=protected-access
|
|
for node in runningvms:
|
|
os.waitpid(0, 0)
|
|
|
|
|
|
def _wait_for_vms_to_run(labname, runningvms, vms):
|
|
"""Wait for virtual machines to finish running.
|
|
|
|
Args:
|
|
labname (str): The name of the lab whose virtual machines are being waited for.
|
|
runningvms (list): A list of strings representing the names of running virtual machines.
|
|
vms (list): A list of strings representing the names of all virtual machines.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
new_vms = get_all_vms(labname, option="runningvms")
|
|
retry = 0
|
|
while retry < 20:
|
|
LOG.info(
|
|
"Waiting for VMs to come up running after taking snapshot..."
|
|
"Up VMs are %s ",
|
|
new_vms,
|
|
)
|
|
if len(runningvms) < len(new_vms):
|
|
time.sleep(1)
|
|
new_vms = get_all_vms(labname, option="runningvms")
|
|
retry += 1
|
|
else:
|
|
LOG.info("All VMs %s are up running after taking snapshot...", vms)
|
|
break
|
|
|
|
|
|
def restore_snapshot(node_list, name):
|
|
"""
|
|
Restore a snapshot of a list of virtual machines.
|
|
|
|
Args:
|
|
node_list (list): A list of strings representing the names of the virtual machines
|
|
whose snapshot will be restored.
|
|
name (str): The name of the snapshot to restore.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
LOG.info("Restore snapshot of %s for hosts %s", name, node_list)
|
|
if len(node_list) != 0:
|
|
vboxmanage_controlvms(node_list, "poweroff")
|
|
LOG.info("Waiting 5s")
|
|
time.sleep(5)
|
|
if len(node_list) != 0:
|
|
for host in node_list:
|
|
vboxmanage_restoresnapshot(host, name)
|
|
LOG.info("Waiting 5s")
|
|
time.sleep(5)
|
|
for host in node_list:
|
|
if "controller-0" not in host:
|
|
vboxmanage_startvm(host)
|
|
LOG.info("Waiting 10s")
|
|
time.sleep(10)
|
|
for host in node_list:
|
|
if "controller-0" in host:
|
|
vboxmanage_startvm(host)
|
|
LOG.info("Waiting 10s")
|
|
time.sleep(10)
|
|
|
|
|
|
def vboxmanage_list(option="vms"):
|
|
"""
|
|
This returns a list of vm names.
|
|
"""
|
|
|
|
cmd = ["vboxmanage", "list", option]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
vms_list = []
|
|
for item in result.splitlines():
|
|
vm_name = re.match(b'"(.*?)"', item)
|
|
vms_list.append(vm_name.group(1))
|
|
|
|
return vms_list
|
|
|
|
|
|
def vboxmanage_showinfo(host):
|
|
"""
|
|
This returns info about the host
|
|
"""
|
|
|
|
if not isinstance(host, str):
|
|
host.decode("utf-8")
|
|
cmd = ["vboxmanage", "showvminfo", host, "--machinereadable"]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
return result
|
|
|
|
|
|
def vboxmanage_createvm(hostname, labname):
|
|
"""
|
|
This creates a VM with the specified name.
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
assert labname, "Labname is required"
|
|
group = "/" + labname
|
|
LOG.info("Creating VM %s", hostname)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"createvm",
|
|
"--name",
|
|
hostname,
|
|
"--register",
|
|
"--ostype",
|
|
"Linux_64",
|
|
"--groups",
|
|
group,
|
|
]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def vboxmanage_deletevms(hosts=None):
|
|
"""
|
|
Deletes a list of VMs
|
|
"""
|
|
|
|
assert hosts, "A list of hostname(s) is required"
|
|
|
|
if len(hosts) != 0:
|
|
for hostname in hosts:
|
|
LOG.info("Deleting VM %s", hostname)
|
|
cmd = ["vboxmanage", "unregistervm", hostname, "--delete"]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info("Waiting 10s")
|
|
time.sleep(10)
|
|
# in case medium is still present after delete
|
|
vboxmanage_deletemedium(hostname)
|
|
|
|
vms_list = vboxmanage_list("vms")
|
|
for items in hosts:
|
|
assert (
|
|
items not in vms_list
|
|
), f"The following vms are unexpectedly present {vms_list}"
|
|
|
|
|
|
def vboxmanage_hostonlyifcreate(name="vboxnet0", oam_ip=None, netmask=None):
|
|
"""
|
|
This creates a hostonly network for systems to communicate.
|
|
"""
|
|
|
|
assert name, "Must provide network name"
|
|
assert oam_ip, "Must provide an OAM IP"
|
|
assert netmask, "Must provide an OAM Netmask"
|
|
|
|
LOG.info("Creating Host-only Network")
|
|
cmd = ["vboxmanage", "hostonlyif", "create"]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
LOG.info("Provisioning %s with IP %s and Netmask %s", name, oam_ip, netmask)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"hostonlyif",
|
|
"ipconfig",
|
|
name,
|
|
"--ip",
|
|
oam_ip,
|
|
"--netmask",
|
|
netmask,
|
|
]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def vboxmanage_hostonlyifdelete(name="vboxnet0"):
|
|
"""
|
|
Deletes hostonly network. This is used as a work around for creating too many hostonlyifs.
|
|
"""
|
|
|
|
assert name, "Must provide network name"
|
|
LOG.info("Removing Host-only Network")
|
|
cmd = ["vboxmanage", "hostonlyif", "remove", name]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def vboxmanage_modifyvm(hostname, vm_config=None):
|
|
"""
|
|
Modify a virtual machine according to a specified configuration.
|
|
|
|
Args:
|
|
hostname(str): Name of host to modify
|
|
vm_config (dict): A dictionary representing the configuration options
|
|
for the virtual machine. Possible key values: cpus, memory, nic, nictype,
|
|
nicpromisc, nicnum, intnet, hostonlyadapter, natnetwork, uartbase,
|
|
uartport, uartmode, uartpath, nicbootprio2=1, prefix=""
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
#put default values in nicbootprio2 and prefix if they not exist
|
|
vm_config["nicbootprio2"] = vm_config.get("nicbootprio2", 1)
|
|
vm_config["prefix"] = vm_config.get("prefix", "")
|
|
|
|
cmd = ["vboxmanage", "modifyvm", hostname]
|
|
nic_cmd = []
|
|
|
|
if _contains_value("cpus", vm_config):
|
|
cmd.extend(["--cpus", vm_config["cpus"]])
|
|
|
|
if _contains_value("memory", vm_config):
|
|
cmd.extend(["--memory", vm_config["memory"]])
|
|
|
|
if _is_network_configured(vm_config):
|
|
nic_cmd = _get_network_configuration(vm_config)
|
|
cmd.extend(nic_cmd)
|
|
|
|
elif _is_nat_network_configured(vm_config):
|
|
cmd.extend([f'--nic{vm_config["nicnum"]}', "nat"])
|
|
|
|
if _is_uart_configured(vm_config):
|
|
uart_config = _add_uart(vm_config)
|
|
cmd.extend(uart_config)
|
|
|
|
if _contains_value("nicbootprio2", vm_config):
|
|
cmd.extend(["--nicbootprio2"])
|
|
cmd.extend([f'{vm_config["nicbootprio2"]}'])
|
|
|
|
cmd.extend(["--boot4"])
|
|
cmd.extend(["net"])
|
|
|
|
LOG.info("#### Updating VM %s configuration", hostname)
|
|
LOG.info("#### Executing command on the host machine:\n$ %s\n", ' '.join(str(i) for i in cmd))
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def _is_network_configured(vm_config):
|
|
"""
|
|
Checks whether a network interface is configured in the given VM configuration.
|
|
|
|
Args:
|
|
vm_config (dict): A dictionary representing the configuration options for the VM.
|
|
|
|
Returns:
|
|
bool: True if a network interface is configured, False otherwise.
|
|
"""
|
|
|
|
return (_contains_value("nic", vm_config)
|
|
and _contains_value("nictype", vm_config)
|
|
and _contains_value("nicpromisc", vm_config)
|
|
and _contains_value("nicnum", vm_config)
|
|
)
|
|
|
|
|
|
def _get_network_configuration(vm_config):
|
|
"""
|
|
Constructs a list of options for the network interface based on the values in vm_config.
|
|
|
|
Args:
|
|
vm_config (dict): A dictionary representing the configuration options for the VM.
|
|
|
|
Returns:
|
|
list: A list of command-line options for the network interface.
|
|
"""
|
|
|
|
nic_cmd = [f'--nic{vm_config["nicnum"]}', vm_config["nic"]]
|
|
nic_cmd.extend([f'--nictype{vm_config["nicnum"]}', vm_config["nictype"]])
|
|
nic_cmd.extend([f'--nicpromisc{vm_config["nicnum"]}', vm_config["nicpromisc"]])
|
|
|
|
if _contains_value("intnet", vm_config):
|
|
intnet = vm_config["intnet"]
|
|
if _contains_value("prefix", vm_config):
|
|
intnet = f"{vm_config['prefix']}-{intnet}"
|
|
else:
|
|
intnet = f"{intnet}"
|
|
nic_cmd.extend([f'--intnet{vm_config["nicnum"]}', intnet])
|
|
|
|
if _contains_value("hostonlyadapter", vm_config):
|
|
nic_cmd.extend(
|
|
[
|
|
f'--hostonlyadapter{vm_config["nicnum"]}',
|
|
vm_config["hostonlyadapter"],
|
|
]
|
|
)
|
|
|
|
if _contains_value("natnetwork", vm_config):
|
|
nic_cmd.extend(
|
|
[f'--nat-network{vm_config["nicnum"]}', vm_config["natnetwork"]]
|
|
)
|
|
|
|
return nic_cmd
|
|
|
|
|
|
def _is_nat_network_configured(vm_config):
|
|
"""
|
|
Checks whether the NAT network is configured in the given VM configuration.
|
|
|
|
Args:
|
|
vm_config (dict): A dictionary representing the configuration options for the VM.
|
|
|
|
Returns:
|
|
bool: True if the NAT network is configured, False otherwise.
|
|
"""
|
|
|
|
return _contains_value("nicnum", vm_config) and vm_config.get("nictype") == "nat"
|
|
|
|
|
|
def _is_uart_configured(vm_config):
|
|
"""
|
|
Checks whether the UART device is configured in the given VM configuration.
|
|
|
|
Args:
|
|
vm_config (dict): A dictionary representing the configuration options for the VM.
|
|
|
|
Returns:
|
|
bool: True if the UART device is configured, False otherwise.
|
|
"""
|
|
|
|
return (
|
|
_contains_value("uartbase", vm_config)
|
|
and _contains_value("uartport", vm_config)
|
|
and _contains_value("uartmode", vm_config)
|
|
and _contains_value("uartpath", vm_config)
|
|
)
|
|
|
|
|
|
def _add_uart(vm_config):
|
|
"""
|
|
Constructs a list of options for the UART device based on the values in vm_config.
|
|
|
|
Args:
|
|
vm_config (dict): A dictionary representing the configuration options for the VM.
|
|
|
|
Returns:
|
|
list: A list of command-line options for the UART device.
|
|
"""
|
|
|
|
uart_config = ["--uart1"]
|
|
uart_config.extend([f'{vm_config["uartbase"]}'])
|
|
uart_config.extend([f'{vm_config["uartport"]}'])
|
|
uart_config.extend(["--uartmode1"])
|
|
uart_config.extend([f'{vm_config["uartmode"]}'])
|
|
uart_config.extend([f'{vm_config["uartpath"]}'])
|
|
|
|
return uart_config
|
|
|
|
|
|
def _contains_value(key, dictionary):
|
|
return key in dictionary and dictionary[key]
|
|
|
|
|
|
def vboxmanage_storagectl(hostname=None, storectl="sata", hostiocache="off"):
|
|
"""
|
|
This creates a storage controller on the host.
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
assert storectl, "Type of storage controller is required"
|
|
|
|
LOG.info("Creating %s storage controller on VM %s", storectl, hostname)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"storagectl",
|
|
hostname,
|
|
"--name",
|
|
storectl,
|
|
"--add",
|
|
storectl,
|
|
"--hostiocache",
|
|
hostiocache,
|
|
]
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def vboxmanage_storageattach(hostname, storage_config):
|
|
"""
|
|
Attaches a disk to a storage controller.
|
|
|
|
Args:
|
|
hostname (str): Name of the virtual machine.
|
|
storage_config (dict): A dictionary containing the config options for the storage device.
|
|
Possible key values: storectl, storetype, disk, port_num, device_num.
|
|
|
|
Returns:
|
|
str: The output of the vboxmanage command.
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
assert storage_config and isinstance(storage_config, dict), "Storage configuration is required"
|
|
|
|
storectl = storage_config.get("storectl", "sata")
|
|
storetype = storage_config.get("storetype", "hdd")
|
|
disk = storage_config.get("disk")
|
|
port_num = storage_config.get("port_num", "0")
|
|
device_num = storage_config.get("device_num", "0")
|
|
|
|
assert disk, "Disk name is required"
|
|
assert storectl, "Name of storage controller is required"
|
|
assert storetype, "Type of storage controller is required"
|
|
|
|
LOG.info(
|
|
"Attaching %s storage to storage controller %s on VM %s",
|
|
storetype,
|
|
storectl,
|
|
hostname,
|
|
)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"storageattach",
|
|
hostname,
|
|
"--storagectl",
|
|
storectl,
|
|
"--medium",
|
|
disk,
|
|
"--type",
|
|
storetype,
|
|
"--port",
|
|
port_num,
|
|
"--device",
|
|
device_num,
|
|
]
|
|
return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def vboxmanage_deletemedium(hostname, vbox_home_dir="/home"):
|
|
"""
|
|
Deletes the disk medium associated with a virtual machine.
|
|
|
|
Args:
|
|
hostname (str): The name of the virtual machine to which the disk medium is attached.
|
|
vbox_home_dir (str): The directory in which the disk medium files are stored.
|
|
Defaults to "/home".
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
|
|
if platform in ("win32", "win64"):
|
|
return
|
|
|
|
username = getpass.getuser()
|
|
vbox_home_dir = f"{vbox_home_dir}/{username}/vbox_disks/"
|
|
|
|
disk_list = [
|
|
f
|
|
for f in os.listdir(vbox_home_dir)
|
|
if os.path.isfile(os.path.join(vbox_home_dir, f)) and hostname in f
|
|
]
|
|
LOG.info("Disk mediums to delete: %s", disk_list)
|
|
for disk in disk_list:
|
|
LOG.info("Disconnecting disk %s from vbox.", disk)
|
|
try:
|
|
cmd = [
|
|
"vboxmanage",
|
|
"closemedium",
|
|
"disk",
|
|
f"{vbox_home_dir}{disk}",
|
|
"--delete",
|
|
]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info(result)
|
|
except subprocess.CalledProcessError as exception:
|
|
# Continue if failures, disk may not be present
|
|
LOG.warning(
|
|
"Error disconnecting disk, continuing. "
|
|
"Details: stdout: %s stderr: %s",
|
|
exception.stdout,
|
|
exception.stderr,
|
|
)
|
|
LOG.info("Removing backing file %s", disk)
|
|
try:
|
|
os.remove(f"{vbox_home_dir}{disk}")
|
|
except Exception as exc:
|
|
LOG.debug("Failure at removing backing file\nError: %s\n", repr(exc))
|
|
|
|
|
|
def vboxmanage_createmedium(hostname=None, disk_list=None, vbox_home_dir="/home"):
|
|
"""
|
|
This creates the required disks.
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
assert disk_list, "A list of disk sizes is required"
|
|
|
|
username = getpass.getuser()
|
|
device_num = 0
|
|
port_num = 0
|
|
disk_count = 1
|
|
for disk in disk_list:
|
|
if platform in ("win32", "win64"):
|
|
file_name = (
|
|
"C:\\Users\\"
|
|
+ username
|
|
+ "\\vbox_disks\\"
|
|
+ hostname
|
|
+ f"_disk_{disk_count}"
|
|
)
|
|
else:
|
|
file_name = (
|
|
vbox_home_dir
|
|
+ "/"
|
|
+ username
|
|
+ "/vbox_disks/"
|
|
+ hostname
|
|
+ f"_disk_{disk_count}"
|
|
)
|
|
LOG.info(
|
|
"Creating disk %s of size %s on VM %s on device %s port %s",
|
|
file_name,
|
|
disk,
|
|
hostname,
|
|
device_num,
|
|
port_num,
|
|
)
|
|
|
|
try:
|
|
cmd = [
|
|
"vboxmanage",
|
|
"createmedium",
|
|
"disk",
|
|
"--size",
|
|
str(disk),
|
|
"--filename",
|
|
file_name,
|
|
"--format",
|
|
"vdi",
|
|
"--variant",
|
|
"standard",
|
|
]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info(result)
|
|
except subprocess.CalledProcessError as exception:
|
|
LOG.error("Error stdout: %s stderr: %s", exception.stdout, exception.stderr)
|
|
raise
|
|
vboxmanage_storageattach(
|
|
hostname,
|
|
{
|
|
"storectl": "sata",
|
|
"storetype": "hdd",
|
|
"disk": file_name + ".vdi",
|
|
"port_num": str(port_num),
|
|
"device_num": str(device_num),
|
|
},
|
|
)
|
|
disk_count += 1
|
|
port_num += 1
|
|
|
|
LOG.info("Waiting 5s")
|
|
time.sleep(5)
|
|
|
|
|
|
def vboxmanage_startvm(hostname=None, headless=False, force=False):
|
|
"""
|
|
This allows you to power on a VM.
|
|
"""
|
|
|
|
assert hostname, "Hostname is required"
|
|
|
|
if not force:
|
|
LOG.info("Check if VM is running")
|
|
running_vms = vboxmanage_list(option="runningvms")
|
|
else:
|
|
running_vms = []
|
|
|
|
interface_type = "gui"
|
|
if headless:
|
|
interface_type = "headless"
|
|
|
|
if hostname.encode("utf-8") in running_vms:
|
|
LOG.info("Host %s is already started", hostname)
|
|
else:
|
|
LOG.info("Powering on VM %s", hostname)
|
|
cmd = ["vboxmanage", "startvm", hostname, "--type", interface_type]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info(result)
|
|
|
|
# Wait for VM to start
|
|
tmout = 20
|
|
while tmout:
|
|
tmout -= 1
|
|
running_vms = vboxmanage_list(option="runningvms")
|
|
if hostname.encode("utf-8") in running_vms:
|
|
break
|
|
time.sleep(1)
|
|
else:
|
|
raise f"Failed to start VM: {hostname}"
|
|
LOG.info("VM '%s' started.", hostname)
|
|
|
|
|
|
def vboxmanage_controlvms(hosts=None, action=None):
|
|
"""
|
|
This allows you to control a VM, e.g. pause, resume, etc.
|
|
"""
|
|
|
|
assert hosts, "Hostname is required"
|
|
assert action, "Need to provide an action to execute"
|
|
|
|
for host in hosts:
|
|
LOG.info("Executing %s action on VM %s", action, host)
|
|
subprocess.call(
|
|
["vboxmanage", "controlvm", host, action], stderr=subprocess.STDOUT
|
|
)
|
|
time.sleep(1)
|
|
|
|
|
|
def vboxmanage_takesnapshot(hosts=None, name=None):
|
|
"""
|
|
This allows you to take snapshot of VMs.
|
|
"""
|
|
|
|
assert hosts, "Hostname is required"
|
|
assert name, "Need to provide a name for the snapshot"
|
|
|
|
for host in hosts:
|
|
LOG.info("Taking snapshot %s on VM %s", name, host)
|
|
subprocess.call(
|
|
["vboxmanage", "snapshot", host, "take", name], stderr=subprocess.STDOUT
|
|
)
|
|
|
|
|
|
def vboxmanage_restoresnapshot(host=None, name=None):
|
|
"""
|
|
This allows you to restore snapshot of a VM.
|
|
"""
|
|
|
|
assert host, "Hostname is required"
|
|
assert name, "Need to provide the snapshot to restore"
|
|
|
|
LOG.info("Restoring snapshot %s on VM %s", name, host)
|
|
subprocess.call(
|
|
["vboxmanage", "snapshot", host, "restore", name], stderr=subprocess.STDOUT
|
|
)
|
|
|
|
LOG.info("Waiting 10s")
|
|
time.sleep(10)
|
|
|
|
|
|
def vboxmanage_getrulename(network, local_port):
|
|
"""
|
|
Get port-forwarding rule for given NAT network and local port in VirtualBox.
|
|
|
|
Args:
|
|
network (str): Name of the NAT network.
|
|
local_port (str): The local port number.
|
|
|
|
Returns:
|
|
(str): Name of rule or empty
|
|
"""
|
|
|
|
# List information about all nat networks in VirtualBox
|
|
cmd = ["vboxmanage", "list", "natnets", "--long"]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
natpattern = r"Name:(.*?)loopback mappings \(ipv4\)"
|
|
natnetworks = re.findall(natpattern,result.decode(),re.DOTALL)
|
|
|
|
# Get the rule name of the given local port in the given natnetwork
|
|
for natnetwork in natnetworks:
|
|
natinfo = natnetwork.strip().split('\n')
|
|
if natinfo[0] == network:
|
|
try:
|
|
startindex = natinfo.index("Port-forwarding (ipv4)")
|
|
except ValueError:
|
|
# If no index is found the function return an empty string
|
|
return ""
|
|
for index in range (startindex+1,len(natinfo)):
|
|
rule = natinfo[index].strip()
|
|
|
|
parsed_rule = rule.split(':')
|
|
if int(parsed_rule[3]) == int(local_port):
|
|
return parsed_rule[0]
|
|
return ""
|
|
|
|
|
|
def vboxmanage_addportforward(rule_name, local_port, guest_ip, guest_port, network):
|
|
"""
|
|
Add port-forwarding rule for a NAT network in VirtualBox.
|
|
|
|
Args:
|
|
rule_name (str): Name of the port-forward rule to be added.
|
|
local_port (str): The local port number to forward.
|
|
guest_ip (str): The IP address of the guest to forward to.
|
|
guest_port (str): The port number on the guest to forward to.
|
|
network (str): Name of the NAT network.
|
|
|
|
Returns:
|
|
True if the port was added
|
|
False if an error occurred when trying to add the port-forward rule.
|
|
"""
|
|
|
|
rule = f"{rule_name}:tcp:[]:{local_port}:[{guest_ip}]:{guest_port}"
|
|
|
|
LOG.info("Creating port-forwarding rule to: %s", rule)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"natnetwork",
|
|
"modify",
|
|
"--netname",
|
|
network,
|
|
"--port-forward-4",
|
|
rule,
|
|
]
|
|
try:
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
except subprocess.CalledProcessError:
|
|
LOG.error("Error while trying to create port-forwarding rule. Continuing installation!")
|
|
return False
|
|
return True
|
|
|
|
|
|
def vboxmanage_deleteportforward(rule_name, network):
|
|
"""
|
|
Delete port-forwarding rule for a NAT network in VirtualBox.
|
|
|
|
Args:
|
|
rule_name (str): Name of the port-forward rule to be deleted.
|
|
network (str): Name of the NAT network.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
LOG.info(
|
|
"Removing previous forwarding rule '%s' from NAT network '%s'",
|
|
rule_name,
|
|
network,
|
|
)
|
|
cmd = [
|
|
"vboxmanage",
|
|
"natnetwork",
|
|
"modify",
|
|
"--netname",
|
|
network,
|
|
"--port-forward-4",
|
|
"delete",
|
|
rule_name,
|
|
]
|
|
try:
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
except subprocess.CalledProcessError:
|
|
LOG.error("Error while trying to delete port-forwarding rule. Continuing installation!")
|
|
|
|
|
|
def vboxmanage_createnatnet(network, cidr):
|
|
"""
|
|
Create new NatNetwork
|
|
|
|
Args:
|
|
network (str): Name of the NAT network.
|
|
cidr (str): CIDR for the NAT network.
|
|
|
|
Returns:
|
|
True if the command is executed with success.
|
|
False if the command throws an exception.
|
|
"""
|
|
|
|
exists = vboxmanage_natnetexists(network)
|
|
|
|
if exists:
|
|
LOG.info('NatNetwork named "%s" already exists, skipping creation.', network)
|
|
cidrcheck = vboxmanage_checkcidr(network, cidr)
|
|
if not cidrcheck:
|
|
return False
|
|
else:
|
|
cmd = [
|
|
"vboxmanage",
|
|
"natnetwork",
|
|
"add",
|
|
"--netname",
|
|
network,
|
|
"--network",
|
|
cidr,
|
|
"--dhcp",
|
|
"off",
|
|
"--ipv6",
|
|
"on"]
|
|
|
|
try:
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
LOG.info('NatNetwork named "%s" was sucessfully created.', network)
|
|
except subprocess.CalledProcessError:
|
|
LOG.error("Error while trying to create NatNetwork")
|
|
raise
|
|
|
|
return True
|
|
|
|
|
|
def vboxmanage_natnetexists(network):
|
|
"""
|
|
Verify if NatNetwork already exists
|
|
|
|
Args:
|
|
network (str): Name of the NAT network.
|
|
|
|
Returns:
|
|
True if the NetNetwork exists.
|
|
False if the NatNetwork does not exists.
|
|
"""
|
|
|
|
cmd = ["vboxmanage", "list", "natnets", "--long"]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
natpattern = r"NetworkName:(.*?)IP"
|
|
natnetworks = re.findall(natpattern,result.decode(),re.DOTALL)
|
|
for natnetwork in natnetworks:
|
|
natname = natnetwork.strip().split('\n')
|
|
if natname[0] == network:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def vboxmanage_checkcidr(network, cidr):
|
|
"""
|
|
Check if the CIDR of a natnetwork corresponds to the OAM network
|
|
|
|
Args:
|
|
network (str): Name of the NAT network.
|
|
cidr (str): CIDR for the NAT network.
|
|
|
|
Returns:
|
|
True if CIDR is correct for the given NAT network.
|
|
False if CIDR is different for the given NAT network.
|
|
"""
|
|
|
|
cmd = ["vboxmanage", "list", "natnets", "--long"]
|
|
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
natpattern = r"Name:(.*?)IPv6 Enabled"
|
|
natnetworks = re.findall(natpattern,result.decode(),re.DOTALL)
|
|
for natnetwork in natnetworks:
|
|
natname = natnetwork.strip().split('\n')
|
|
if network == natname[0] and cidr in natname[2]:
|
|
return True
|
|
|
|
return False |