diff --git a/devops/devops/__init__.py b/devops/devops/__init__.py index 4e3272caf..108acf130 100644 --- a/devops/devops/__init__.py +++ b/devops/devops/__init__.py @@ -1,2 +1,2 @@ -from .main import * - +from .main import * + diff --git a/devops/devops/__main__.py b/devops/devops/__main__.py index 25bf41c83..01631dd12 100644 --- a/devops/devops/__main__.py +++ b/devops/devops/__main__.py @@ -1,31 +1,31 @@ -from main import * - -def saved(args): - c = getController() - for saved_env in c.saved_environments: - print(saved_env) - -def resume(args): - parser = argparse.ArgumentParser(prog='devops resume') - parser.add_argument('environment') - arguments = parser.parse_args(args) - env = load(arguments.environment) - import code - code.InteractiveConsole(locals={'environment': env}).interact() - -import sys -import argparse - -parser = argparse.ArgumentParser(prog='devops') -parser.add_argument('command', choices=['saved', 'resume']) -parser.add_argument('command_args', nargs=argparse.REMAINDER) -arguments = parser.parse_args() - -if arguments.command == 'saved': - saved(arguments.command_args) -elif arguments.command == 'resume': - resume(arguments.command_args) -else: - help() - sys.exit(1) - +from main import * + +def saved(args): + c = getController() + for saved_env in c.saved_environments: + print(saved_env) + +def resume(args): + parser = argparse.ArgumentParser(prog='devops resume') + parser.add_argument('environment') + arguments = parser.parse_args(args) + env = load(arguments.environment) + import code + code.InteractiveConsole(locals={'environment': env}).interact() + +import sys +import argparse + +parser = argparse.ArgumentParser(prog='devops') +parser.add_argument('command', choices=['saved', 'resume']) +parser.add_argument('command_args', nargs=argparse.REMAINDER) +arguments = parser.parse_args() + +if arguments.command == 'saved': + saved(arguments.command_args) +elif arguments.command == 'resume': + resume(arguments.command_args) +else: + help() + sys.exit(1) + diff --git a/devops/devops/controller.py b/devops/devops/controller.py index e4d578171..b352ce62a 100644 --- a/devops/devops/controller.py +++ b/devops/devops/controller.py @@ -1,296 +1,296 @@ -import os -import sys -import stat -import tempfile -import shutil -import urllib -import ipaddr -import glob -import random -import string -import re -import time - -from model import Node, Network -from network import IpNetworksPool -from error import DevopsError -import my_yaml - -import logging -logger = logging.getLogger('devops.controller') - -def randstr(length=8): - return ''.join(random.choice(string.ascii_letters) for i in xrange(length)) - - -class Controller: - def __init__(self, driver): - self.driver = driver - - self.networks_pool = IpNetworksPool() - self._reserve_networks() - - self.home_dir = os.environ.get('DEVOPS_HOME') or os.path.join(os.environ['HOME'], ".devops") - try: - os.makedirs(os.path.join(self.home_dir, 'environments'), 0755) - except OSError: - sys.exc_clear() - - def build_environment(self, environment): - logger.info("Building environment %s" % environment.name) - - env_id = getattr(environment, 'id', '-'.join([environment.name, randstr()])) - environment.id = env_id - - logger.debug("Creating environment working directory for %s environment" % environment.name) - environment.work_dir = os.path.join(self.home_dir, 'environments', environment.id) - os.mkdir(environment.work_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - logger.debug("Environment working directory has been created: %s" % environment.work_dir) - - environment.driver = self.driver - - for node in environment.nodes: - for interface in node.interfaces: - interface.node = node - interface.network.interfaces.append(interface) - - for disk in node.disks: - if disk.base_image and disk.base_image.find('://') != -1: - disk.base_image = self._cache_file(disk.base_image) - - if node.cdrom: - if node.cdrom.isopath.find('://') != -1: - node.cdrom.isopath = self._cache_file(node.cdrom.isopath) - - for network in environment.networks: - logger.info("Building network %s" % network.name) - - network.ip_addresses = self.networks_pool.get() - - if network.pxe: - network.dhcp_server = True - tftp_path = os.path.join(environment.work_dir, "tftp") - if not os.path.exists(tftp_path): - os.mkdir(tftp_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - network.tftp_root_dir = tftp_path - - if network.dhcp_server: - allocated_addresses = [] - for interface in network.interfaces: - for address in interface.ip_addresses: - if address in network.ip_addresses: - allocated_addresses.append(address) - - next_address_index = 2 - for interface in network.interfaces: - if len(interface.ip_addresses) == 0: - while next_address_index < network.ip_addresses.numhosts and \ - network.ip_addresses[next_address_index] in allocated_addresses: - next_address_index += 1 - - if next_address_index >= network.ip_addresses.numhosts: - raise DevopsError, "Failed to allocated IP address for node '%s' in network '%s': no more addresses left" % (node.name, network.name) - - address = network.ip_addresses[next_address_index] - interface.ip_addresses.append(address) - allocated_addresses.append(next_address_index) - next_address_index += 1 - - network.dhcp_dynamic_address_start = network.ip_addresses[next_address_index] - network.dhcp_dynamic_address_end = network.ip_addresses[network.ip_addresses.numhosts-2] - - for network in environment.networks: - logger.info("Building network %s" % network.name) - - self.driver.create_network(network) - network.driver = self.driver - - for node in environment.nodes: - logger.info("Building node %s" % node.name) - - self._build_node(environment, node) - node.driver = self.driver - - for network in environment.networks: - self.driver.create_network(network) - network.start() - - environment.built = True - logger.info("Finished building environment %s" % environment.name) - - def destroy_environment(self, environment): - logger.info("Destroying environment %s" % environment.name) - - for node in environment.nodes: - logger.info("Destroying node %s" % node.name) - - node.stop() - - for snapshot in node.snapshots: - self.driver.delete_snapshot(node, snapshot) - - self.driver.delete_node(node) - del node.driver - - for network in environment.networks: - logger.info("Destroying network %s" % network.name) - - network.stop() - self.driver.delete_network(network) - del network.driver - - # FIXME - try: - self.networks_pool.put(network.ip_addresses) - except: - pass - - del environment.driver - - logger.info("Removing environment %s files" % environment.name) - - shutil.rmtree(environment.work_dir) - - logger.info("Finished destroying environment %s" % environment.name) - - def load_environment(self, environment_id): - env_work_dir = os.path.join(self.home_dir, 'environments', environment_id) - env_config_file = os.path.join(env_work_dir, 'config') - if not os.path.exists(env_config_file): - raise DevopsError, "Environment '%s' couldn't be found" % environment_id - - with file(env_config_file) as f: - data = f.read() - - environment = my_yaml.load(data) - - return environment - - def save_environment(self, environment): - data = my_yaml.dump(environment) - if not environment.built: - raise DevopsError, "Environment has not been built yet." - with file(os.path.join(environment.work_dir, 'config'), 'w') as f: - f.write(data) - - @property - def saved_environments(self): - saved_environments = [] - for path in glob.glob(os.path.join(self.home_dir, 'environments', '*')): - if os.path.exists(os.path.join(path, 'config')): - saved_environments.append(os.path.basename(path)) - return saved_environments - - def _reserve_networks(self): - logger.debug("Scanning for ip networks that are already taken") - with os.popen("ip route") as f: - for line in f: - words = line.split() - if len(words) == 0: - continue - if words[0] == 'default': - continue - address = ipaddr.IPv4Network(words[0]) - logger.debug("Reserving ip network %s" % address) - self.networks_pool.reserve(address) - - logger.debug("Finished scanning for taken ip networks") - - def _build_node(self, environment, node): - for disk in filter(lambda d: d.path is None, node.disks): - logger.debug("Creating disk file for node '%s'" % node.name) - fd, disk.path = tempfile.mkstemp( - prefix=environment.work_dir + '/disk', - suffix='.' + disk.format - ) - os.close(fd) - os.chmod(disk.path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH) - self.driver.create_disk(disk) - - logger.debug("Creating node '%s'" % node.name) - self.driver.create_node(node) - - def _cache_file(self, url): - cache_dir = os.path.join(self.home_dir, 'cache') - if not os.path.exists(cache_dir): - os.makedirs(cache_dir, 0755) - - cache_log_path = os.path.join(cache_dir, 'entries') - if os.path.exists(cache_log_path): - with file(cache_log_path) as f: - cache_entries = my_yaml.load(f.read()) - else: - cache_entries = dict() - - ext_cache_log_path = os.path.join(cache_dir, 'extended_entries') - if os.path.exists(ext_cache_log_path): - with file(ext_cache_log_path) as f: - extended_cache_entries = my_yaml.load(f.read()) - else: - extended_cache_entries = dict() - for key, value in cache_entries.items(): - if not extended_cache_entries.has_key(key): - extended_cache_entries[key] = {'cached-path':value} - - RFC1123_DATETIME_FORMAT = '%a, %d %b %Y %H:%M:%S %Z' - url_attrs = {} - cached_path = '' - local_mtime = 0 - - if extended_cache_entries.has_key(url): - url_attrs = extended_cache_entries[url] - cached_path = url_attrs['cached-path'] - - if url_attrs.has_key('last-modified'): - local_mtime = time.mktime( - time.strptime(url_attrs['last-modified'], RFC1123_DATETIME_FORMAT)) - - else: - logger.debug("Cache miss for '%s', downloading" % url) - - remote = urllib.urlopen(url) - msg = remote.info() - if msg.has_key('last-modified'): - url_mtime = time.mktime( - time.strptime(msg['last-modified'], RFC1123_DATETIME_FORMAT)) - else: - url_mtime = 0 - - if local_mtime >= url_mtime: - logger.debug("Cache is up to date for '%s': '%s'" % ( - url, cached_path)) - return cached_path - - elif cached_path != '': - logger.debug("Cache is old for '%s', downloading" % url) - - if not os.access(cached_path, os.W_OK): - try: - os.unlink(cached_path) - except Exception: - pass - fd, cached_path = tempfile.mkstemp(prefix=cache_dir+'/') - os.close(fd) - - with file(cached_path, 'w') as f: - shutil.copyfileobj(remote, f) - - os.chmod(cached_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) - - if msg.has_key('last-modified'): - url_attrs['last-modified'] = msg['last-modified'] - url_attrs['cached-path'] = cached_path - extended_cache_entries[url] = url_attrs - - with file(ext_cache_log_path, 'w') as f: - f.write(my_yaml.dump(extended_cache_entries)) - - cache_entries[url] = cached_path - - with file(cache_log_path, 'w') as f: - f.write(my_yaml.dump(cache_entries)) - - logger.debug("Cached '%s' to '%s'" % (url, cached_path)) - - return cached_path - +import os +import sys +import stat +import tempfile +import shutil +import urllib +import ipaddr +import glob +import random +import string +import re +import time + +from model import Node, Network +from network import IpNetworksPool +from error import DevopsError +import my_yaml + +import logging +logger = logging.getLogger('devops.controller') + +def randstr(length=8): + return ''.join(random.choice(string.ascii_letters) for i in xrange(length)) + + +class Controller: + def __init__(self, driver): + self.driver = driver + + self.networks_pool = IpNetworksPool() + self._reserve_networks() + + self.home_dir = os.environ.get('DEVOPS_HOME') or os.path.join(os.environ['HOME'], ".devops") + try: + os.makedirs(os.path.join(self.home_dir, 'environments'), 0755) + except OSError: + sys.exc_clear() + + def build_environment(self, environment): + logger.info("Building environment %s" % environment.name) + + env_id = getattr(environment, 'id', '-'.join([environment.name, randstr()])) + environment.id = env_id + + logger.debug("Creating environment working directory for %s environment" % environment.name) + environment.work_dir = os.path.join(self.home_dir, 'environments', environment.id) + os.mkdir(environment.work_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + logger.debug("Environment working directory has been created: %s" % environment.work_dir) + + environment.driver = self.driver + + for node in environment.nodes: + for interface in node.interfaces: + interface.node = node + interface.network.interfaces.append(interface) + + for disk in node.disks: + if disk.base_image and disk.base_image.find('://') != -1: + disk.base_image = self._cache_file(disk.base_image) + + if node.cdrom: + if node.cdrom.isopath.find('://') != -1: + node.cdrom.isopath = self._cache_file(node.cdrom.isopath) + + for network in environment.networks: + logger.info("Building network %s" % network.name) + + network.ip_addresses = self.networks_pool.get() + + if network.pxe: + network.dhcp_server = True + tftp_path = os.path.join(environment.work_dir, "tftp") + if not os.path.exists(tftp_path): + os.mkdir(tftp_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + network.tftp_root_dir = tftp_path + + if network.dhcp_server: + allocated_addresses = [] + for interface in network.interfaces: + for address in interface.ip_addresses: + if address in network.ip_addresses: + allocated_addresses.append(address) + + next_address_index = 2 + for interface in network.interfaces: + if len(interface.ip_addresses) == 0: + while next_address_index < network.ip_addresses.numhosts and \ + network.ip_addresses[next_address_index] in allocated_addresses: + next_address_index += 1 + + if next_address_index >= network.ip_addresses.numhosts: + raise DevopsError, "Failed to allocated IP address for node '%s' in network '%s': no more addresses left" % (node.name, network.name) + + address = network.ip_addresses[next_address_index] + interface.ip_addresses.append(address) + allocated_addresses.append(next_address_index) + next_address_index += 1 + + network.dhcp_dynamic_address_start = network.ip_addresses[next_address_index] + network.dhcp_dynamic_address_end = network.ip_addresses[network.ip_addresses.numhosts-2] + + for network in environment.networks: + logger.info("Building network %s" % network.name) + + self.driver.create_network(network) + network.driver = self.driver + + for node in environment.nodes: + logger.info("Building node %s" % node.name) + + self._build_node(environment, node) + node.driver = self.driver + + for network in environment.networks: + self.driver.create_network(network) + network.start() + + environment.built = True + logger.info("Finished building environment %s" % environment.name) + + def destroy_environment(self, environment): + logger.info("Destroying environment %s" % environment.name) + + for node in environment.nodes: + logger.info("Destroying node %s" % node.name) + + node.stop() + + for snapshot in node.snapshots: + self.driver.delete_snapshot(node, snapshot) + + self.driver.delete_node(node) + del node.driver + + for network in environment.networks: + logger.info("Destroying network %s" % network.name) + + network.stop() + self.driver.delete_network(network) + del network.driver + + # FIXME + try: + self.networks_pool.put(network.ip_addresses) + except: + pass + + del environment.driver + + logger.info("Removing environment %s files" % environment.name) + + shutil.rmtree(environment.work_dir) + + logger.info("Finished destroying environment %s" % environment.name) + + def load_environment(self, environment_id): + env_work_dir = os.path.join(self.home_dir, 'environments', environment_id) + env_config_file = os.path.join(env_work_dir, 'config') + if not os.path.exists(env_config_file): + raise DevopsError, "Environment '%s' couldn't be found" % environment_id + + with file(env_config_file) as f: + data = f.read() + + environment = my_yaml.load(data) + + return environment + + def save_environment(self, environment): + data = my_yaml.dump(environment) + if not environment.built: + raise DevopsError, "Environment has not been built yet." + with file(os.path.join(environment.work_dir, 'config'), 'w') as f: + f.write(data) + + @property + def saved_environments(self): + saved_environments = [] + for path in glob.glob(os.path.join(self.home_dir, 'environments', '*')): + if os.path.exists(os.path.join(path, 'config')): + saved_environments.append(os.path.basename(path)) + return saved_environments + + def _reserve_networks(self): + logger.debug("Scanning for ip networks that are already taken") + with os.popen("ip route") as f: + for line in f: + words = line.split() + if len(words) == 0: + continue + if words[0] == 'default': + continue + address = ipaddr.IPv4Network(words[0]) + logger.debug("Reserving ip network %s" % address) + self.networks_pool.reserve(address) + + logger.debug("Finished scanning for taken ip networks") + + def _build_node(self, environment, node): + for disk in filter(lambda d: d.path is None, node.disks): + logger.debug("Creating disk file for node '%s'" % node.name) + fd, disk.path = tempfile.mkstemp( + prefix=environment.work_dir + '/disk', + suffix='.' + disk.format + ) + os.close(fd) + os.chmod(disk.path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH) + self.driver.create_disk(disk) + + logger.debug("Creating node '%s'" % node.name) + self.driver.create_node(node) + + def _cache_file(self, url): + cache_dir = os.path.join(self.home_dir, 'cache') + if not os.path.exists(cache_dir): + os.makedirs(cache_dir, 0755) + + cache_log_path = os.path.join(cache_dir, 'entries') + if os.path.exists(cache_log_path): + with file(cache_log_path) as f: + cache_entries = my_yaml.load(f.read()) + else: + cache_entries = dict() + + ext_cache_log_path = os.path.join(cache_dir, 'extended_entries') + if os.path.exists(ext_cache_log_path): + with file(ext_cache_log_path) as f: + extended_cache_entries = my_yaml.load(f.read()) + else: + extended_cache_entries = dict() + for key, value in cache_entries.items(): + if not extended_cache_entries.has_key(key): + extended_cache_entries[key] = {'cached-path':value} + + RFC1123_DATETIME_FORMAT = '%a, %d %b %Y %H:%M:%S %Z' + url_attrs = {} + cached_path = '' + local_mtime = 0 + + if extended_cache_entries.has_key(url): + url_attrs = extended_cache_entries[url] + cached_path = url_attrs['cached-path'] + + if url_attrs.has_key('last-modified'): + local_mtime = time.mktime( + time.strptime(url_attrs['last-modified'], RFC1123_DATETIME_FORMAT)) + + else: + logger.debug("Cache miss for '%s', downloading" % url) + + remote = urllib.urlopen(url) + msg = remote.info() + if msg.has_key('last-modified'): + url_mtime = time.mktime( + time.strptime(msg['last-modified'], RFC1123_DATETIME_FORMAT)) + else: + url_mtime = 0 + + if local_mtime >= url_mtime: + logger.debug("Cache is up to date for '%s': '%s'" % ( + url, cached_path)) + return cached_path + + elif cached_path != '': + logger.debug("Cache is old for '%s', downloading" % url) + + if not os.access(cached_path, os.W_OK): + try: + os.unlink(cached_path) + except Exception: + pass + fd, cached_path = tempfile.mkstemp(prefix=cache_dir+'/') + os.close(fd) + + with file(cached_path, 'w') as f: + shutil.copyfileobj(remote, f) + + os.chmod(cached_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) + + if msg.has_key('last-modified'): + url_attrs['last-modified'] = msg['last-modified'] + url_attrs['cached-path'] = cached_path + extended_cache_entries[url] = url_attrs + + with file(ext_cache_log_path, 'w') as f: + f.write(my_yaml.dump(extended_cache_entries)) + + cache_entries[url] = cached_path + + with file(cache_log_path, 'w') as f: + f.write(my_yaml.dump(cache_entries)) + + logger.debug("Cached '%s' to '%s'" % (url, cached_path)) + + return cached_path + diff --git a/devops/devops/driver/libvirt.py b/devops/devops/driver/libvirt.py index e0084c7a0..2f2faac64 100644 --- a/devops/devops/driver/libvirt.py +++ b/devops/devops/driver/libvirt.py @@ -1,402 +1,402 @@ -# vim: ts=4 sw=4 expandtab -import xml - -import os -import tempfile -import time -import subprocess, shlex -from collections import deque -from xmlbuilder import XMLBuilder -import ipaddr -import re - -import logging -logger = logging.getLogger('devops.libvirt') - -def index(p, seq): - for i in xrange(len(seq)): - if p(seq[i]): return i - return -1 - -def find(p, seq): - for item in seq: - if p(item): return item - return None - -def spec_priority(spec): - if spec.hypervisor == 'qemu': - return 0.5 - return 1.0 - -class DeploymentSpec: - def __repr__(self): - return "" % (self.arch, self.os_type, self.hypervisor, self.emulator) - -class LibvirtException(Exception): pass - -class LibvirtXMLBuilder: - - def build_network_xml(self, network): - network_xml = XMLBuilder('network') - network_xml.name(network.id) - network_xml.forward(mode='nat') - - if hasattr(network, 'ip_addresses') and not network.ip_addresses is None: - with network_xml.ip(address=str(network.ip_addresses[1]), prefix=str(network.ip_addresses.prefixlen)): - if network.pxe: - network_xml.tftp(root=network.tftp_root_dir) - if network.dhcp_server: - with network_xml.dhcp: - if hasattr(network, 'dhcp_dynamic_address_start'): - start = network.dhcp_dynamic_address_start - else: - start = network.ip_addresses[2] - - if hasattr(network, 'dhcp_dynamic_address_end'): - end = network.dhcp_dynamic_address_end - else: - end = network.ip_addresses[network.ip_addresses.numhosts-2] - - network_xml.range(start=str(start), end=str(end)) - for interface in network.interfaces: - address = find(lambda ip: ip in network.ip_addresses, interface.ip_addresses) - if address and interface.mac_address: - network_xml.host(mac=str(interface.mac_address), ip=str(address), name=interface.node.name) - if network.pxe: - network_xml.bootp(file="pxelinux.0") - - return str(network_xml) - - def build_node_xml(self, node, spec): - node_xml = XMLBuilder("domain", type=spec.hypervisor) - node_xml.name(node.id) - node_xml.vcpu(str(node.cpu)) - node_xml.memory(str(node.memory*1024), unit='KiB') - - with node_xml.os: - node_xml.type(spec.os_type, arch=node.arch) - for boot_dev in node.boot: - if boot_dev == 'disk': - node_xml.boot(dev="hd") - else: - node_xml.boot(dev=boot_dev) - - ide_disk_names = deque(['hd'+c for c in list('abcdefghijklmnopqrstuvwxyz')]) - serial_disk_names = deque(['sd'+c for c in list('abcdefghijklmnopqrstuvwxyz')]) - - - def disk_name(bus='ide'): - if str(bus) == 'ide': - return ide_disk_names.popleft() - return serial_disk_names.popleft() - - - with node_xml.devices: - node_xml.emulator(spec.emulator) - - if len(node.disks) > 0: - node_xml.controller(type="ide") - - for disk in node.disks: - with node_xml.disk(type="file", device="disk"): - node_xml.driver(name="qemu", type=disk.format, cache="unsafe") - node_xml.source(file=disk.path) - node_xml.target(dev=disk_name(disk.bus), bus=disk.bus) - - if node.cdrom: - with node_xml.disk(type="file", device="cdrom"): - node_xml.driver(name="qemu", type="raw") - node_xml.source(file=node.cdrom.isopath) - node_xml.target(dev=disk_name(node.cdrom.bus), bus=node.cdrom.bus) - - for interface in node.interfaces: - with node_xml.interface(type="network"): - node_xml.source(network=interface.network.id) - - if node.vnc: - node_xml.graphics(type='vnc', listen='0.0.0.0', autoport='yes') - - return str(node_xml) - - -class Libvirt: - def __init__(self, xml_builder = LibvirtXMLBuilder()): - self.xml_builder = xml_builder - self._init_capabilities() - - def node_exists(self, node_name): - return self._system("virsh dominfo '%s'" % node_name, expected_resultcodes=(0, 1)) == 0 - - def network_exists(self, network_name): - return self._system("virsh net-info '%s' 2>/dev/null" % network_name, expected_resultcodes=(0, 1)) == 0 - - def create_network(self, network): - if not hasattr(network, 'id') or network.id is None: - network.id = self._generate_network_id(network.name) - elif self.is_network_defined(network): - self._virsh("net-undefine '%s'", network.id) - - with tempfile.NamedTemporaryFile(delete=True) as xml_file: - network_xml = self.xml_builder.build_network_xml(network) - logger.debug("libvirt: Building network with following XML:\n%s" % network_xml) - xml_file.write(network_xml) - xml_file.flush() - self._virsh("net-define '%s'", xml_file.name) - - with os.popen("virsh net-dumpxml '%s'" % network.id) as f: - network_element = xml.parse_stream(f) - - network.bridge_name = network_element.find('bridge/@name') - network.mac_address = network_element.find('mac/@address') - - def delete_network(self, network): - if self.is_network_defined(network): - logger.debug("Network %s is defined. Undefining.") - self._virsh("net-undefine '%s'", network.id) - - - def start_network(self, network): - if not self.is_network_running(network): - logger.debug("Network %s is not running. Starting.") - self._virsh("net-start '%s'", network.id) - - def stop_network(self, network): - if self.is_network_running(network): - logger.debug("Network %s is running. Stopping.") - self._virsh("net-destroy '%s'", network.id) - - def _get_node_xml(self, node): - with os.popen("virsh dumpxml '%s'" % node.id) as f: - return xml.parse_stream(f) - - def create_node(self, node): - specs = filter(lambda spec: spec.arch == node.arch, self.specs) - if len(specs) == 0: - raise LibvirtException, "Can't create node %s: insufficient capabilities" % node.name - - specs.sort(key=spec_priority) - spec = specs[-1] - - if not hasattr(node, 'id') or node.id is None: - node.id = self._generate_node_id(node.name) - - with tempfile.NamedTemporaryFile(delete=True) as xml_file: - node_xml = self.xml_builder.build_node_xml(node, spec) - logger.debug("libvirt: Building node with following XML:\n%s" % node_xml) - xml_file.write(node_xml) - xml_file.flush() - self._virsh("define '%s'", xml_file.name) - - domain = self._get_node_xml(node) - - for interface_element in domain.find_all('devices/interface[@type="network"]'): - network_id = interface_element.find('source/@network') - - interface = find(lambda i: i.network.id == network_id, node.interfaces) - if interface is None: - continue - - interface.mac_address = interface_element.find('mac/@address') - - def delete_node(self, node): - if self.is_node_defined(node): - logger.debug("Node %s defined. Undefining." % node.id) - self._virsh("undefine '%s'", node.id) - - def start_node(self, node): - if not self.is_node_running(node): - logger.debug("Node %s is not running at the moment. Starting." % node.id) - self._virsh("start '%s'", node.id) - - if node.vnc: - domain = self._get_node_xml(node) - - port_text = domain.find('devices/graphics[@type="vnc"]/@port') - if port_text: node.vnc_port = int(port_text) - - - def stop_node(self, node): - if self.is_node_running(node): - logger.debug("Node %s is running at the moment. Stopping." % node.id) - self._virsh("destroy '%s'", node.id) - - - def reset_node(self, node): - self._virsh("reset '%s'", node.id) - - def reboot_node(self, node): - self._virsh("reboot '%s'", node.id) - - def shutdown_node(self, node): - self._virsh("stop '%s'", node.id) - - - def get_node_snapshots(self, node): - command = "virsh snapshot-list '%s'" % node.id - process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - process.wait() - if process.returncode != 0: - logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, process.stderr.read())) - else: - logger.debug("Command '%s' returned %d" % (command, process.returncode)) - - snapshot_ids = [] - for line in process.stdout.readlines()[2:]: - if line.strip() == '': continue - - snapshot_ids.append(line.split()[0]) - - return snapshot_ids - - def create_snapshot(self, node, name=None, description=None): - if not name: - name = str(int(time.time()*100)) - - with tempfile.NamedTemporaryFile(delete=True) as xml_file: - snapshot_xml = XMLBuilder('domainsnapshot') - snapshot_xml.name(name) - if description: - snapshot_xml.description(description) - - logger.debug("Building snapshot with following XML:\n%s" % str(snapshot_xml)) - xml_file.write(str(snapshot_xml)) - xml_file.flush() - - self._virsh("snapshot-create '%s' '%s'", node.id, xml_file.name) - - return name - - def revert_snapshot(self, node, snapshot_name=None): - if not snapshot_name: - snapshot_name = '--current' - self._virsh("snapshot-revert '%s' %s", node.id, snapshot_name) - - def delete_snapshot(self, node, snapshot_name=None): - if not snapshot_name: - snapshot_name = '--current' - self._virsh("snapshot-delete '%s' %s", node.id, snapshot_name) - - def send_keys_to_node(self, node, keys): - keys = scancodes.from_string(str(keys)) - for key_codes in keys: - if isinstance(key_codes[0], str): - if key_codes[0] == 'wait': - time.sleep(1) - - continue - - self._virsh("send-key '%s' %s", node.id, ' '.join(map(lambda x: str(x), key_codes))) - - def create_disk(self, disk): - if not disk.path: - f, disk.path = tempfile.mkstemp(prefix='disk-', suffix=(".%s" % disk.format)) - os.close(f) - - if disk.base_image: - self._system("qemu-img create -f '%(format)s' -b '%(backing_path)s' '%(path)s'" % {'format': disk.format, 'path': disk.path, 'backing_path': disk.base_image}) - else: - self._system("qemu-img create -f '%(format)s' '%(path)s' '%(size)s'" % {'format': disk.format, 'path': disk.path, 'size': disk.size}) - - - def delete_disk(self, disk): - if disk.path is None: return - - os.unlink(disk.path) - - def get_interface_addresses(self, interface): - command = "arp -an | awk '$4 == \"%(mac)s\" && $7 == \"%(interface)s\" {print substr($2, 2, length($2)-2)}'" % { 'mac': interface.mac_address, 'interface': interface.network.bridge_name} - with os.popen(command) as f: - return [ipaddr.IPv4Address(s) for s in f.read().split()] - - def _virsh(self, format, *args): - command = ("virsh " + format) % args - logger.debug("Running '%s'" % command) - process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - process.wait() - if process.returncode != 0: - logger.error("Command '%s' returned code %d:\n%s" % (command, process.returncode, process.stderr.read())) - raise LibvirtException, "Failed to execute command '%s'" % command - - def _init_capabilities(self): - with os.popen("virsh capabilities") as f: - capabilities = xml.parse_stream(f) - - self.specs = [] - - for guest in capabilities.find_all('guest'): - for arch in guest.find_all('arch'): - for domain in arch.find_all('domain'): - spec = DeploymentSpec() - spec.arch = arch['name'] - spec.os_type = guest.find('os_type/text()') - spec.hypervisor = domain['type'] - spec.emulator = (domain.find('emulator') or arch.find('emulator')).text - - self.specs.append(spec) - - def _generate_network_id(self, name='net'): - while True: - id = name + '-' + str(int(time.time()*100)) - if not self.network_exists(id): - return id - - def _generate_node_id(self, name='node'): - while True: - id = name + '-' + str(int(time.time()*100)) - if not self.node_exists(id): - return id - - - def is_node_defined(self, node): - return self._system2("virsh list --all | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0 - - def is_node_running(self, node): - return self._system2("virsh list | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0 - - def is_network_defined(self, network): - return self._system2("virsh net-list --all | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0 - - def is_network_running(self, network): - return self._system2("virsh net-list | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0 - - def _system2(self, command, expected_resultcodes=(0,)): - logger.debug("Running %s" % command) - - commands = [ i.strip() for i in re.split(ur'\|', command)] - serr = [] - - process = [] - process.append(subprocess.Popen(shlex.split(commands[0]), stdin=None, - stdout=subprocess.PIPE, stderr=subprocess.PIPE)) - for c in commands[1:]: - process.append(subprocess.Popen(shlex.split(c), stdin=process[-1].stdout, - stdout=subprocess.PIPE, stderr=subprocess.PIPE)) - - process[-1].wait() - - for p in process: - serr += [ err.strip() for err in p.stderr.readlines() ] - - returncode = process[-1].returncode - - if expected_resultcodes and not returncode in expected_resultcodes: - logger.error("Command '%s' returned %d, stderr: %s" % (command, returncode, '\n'.join(serr))) - else: - logger.debug("Command '%s' returned %d" % (command, returncode)) - - return returncode - - def _system(self, command, expected_resultcodes=(0,)): - logger.debug("Running '%s'" % command) - serr = [] - process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - process.wait() - serr += [ err.strip() for err in process.stderr.readlines() ] - - if expected_resultcodes and not process.returncode in expected_resultcodes: - logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, '\n'.join(serr))) - else: - logger.debug("Command '%s' returned %d" % (command, process.returncode)) - - return process.returncode - +# vim: ts=4 sw=4 expandtab +import xml + +import os +import tempfile +import time +import subprocess, shlex +from collections import deque +from xmlbuilder import XMLBuilder +import ipaddr +import re + +import logging +logger = logging.getLogger('devops.libvirt') + +def index(p, seq): + for i in xrange(len(seq)): + if p(seq[i]): return i + return -1 + +def find(p, seq): + for item in seq: + if p(item): return item + return None + +def spec_priority(spec): + if spec.hypervisor == 'qemu': + return 0.5 + return 1.0 + +class DeploymentSpec: + def __repr__(self): + return "" % (self.arch, self.os_type, self.hypervisor, self.emulator) + +class LibvirtException(Exception): pass + +class LibvirtXMLBuilder: + + def build_network_xml(self, network): + network_xml = XMLBuilder('network') + network_xml.name(network.id) + network_xml.forward(mode='nat') + + if hasattr(network, 'ip_addresses') and not network.ip_addresses is None: + with network_xml.ip(address=str(network.ip_addresses[1]), prefix=str(network.ip_addresses.prefixlen)): + if network.pxe: + network_xml.tftp(root=network.tftp_root_dir) + if network.dhcp_server: + with network_xml.dhcp: + if hasattr(network, 'dhcp_dynamic_address_start'): + start = network.dhcp_dynamic_address_start + else: + start = network.ip_addresses[2] + + if hasattr(network, 'dhcp_dynamic_address_end'): + end = network.dhcp_dynamic_address_end + else: + end = network.ip_addresses[network.ip_addresses.numhosts-2] + + network_xml.range(start=str(start), end=str(end)) + for interface in network.interfaces: + address = find(lambda ip: ip in network.ip_addresses, interface.ip_addresses) + if address and interface.mac_address: + network_xml.host(mac=str(interface.mac_address), ip=str(address), name=interface.node.name) + if network.pxe: + network_xml.bootp(file="pxelinux.0") + + return str(network_xml) + + def build_node_xml(self, node, spec): + node_xml = XMLBuilder("domain", type=spec.hypervisor) + node_xml.name(node.id) + node_xml.vcpu(str(node.cpu)) + node_xml.memory(str(node.memory*1024), unit='KiB') + + with node_xml.os: + node_xml.type(spec.os_type, arch=node.arch) + for boot_dev in node.boot: + if boot_dev == 'disk': + node_xml.boot(dev="hd") + else: + node_xml.boot(dev=boot_dev) + + ide_disk_names = deque(['hd'+c for c in list('abcdefghijklmnopqrstuvwxyz')]) + serial_disk_names = deque(['sd'+c for c in list('abcdefghijklmnopqrstuvwxyz')]) + + + def disk_name(bus='ide'): + if str(bus) == 'ide': + return ide_disk_names.popleft() + return serial_disk_names.popleft() + + + with node_xml.devices: + node_xml.emulator(spec.emulator) + + if len(node.disks) > 0: + node_xml.controller(type="ide") + + for disk in node.disks: + with node_xml.disk(type="file", device="disk"): + node_xml.driver(name="qemu", type=disk.format, cache="unsafe") + node_xml.source(file=disk.path) + node_xml.target(dev=disk_name(disk.bus), bus=disk.bus) + + if node.cdrom: + with node_xml.disk(type="file", device="cdrom"): + node_xml.driver(name="qemu", type="raw") + node_xml.source(file=node.cdrom.isopath) + node_xml.target(dev=disk_name(node.cdrom.bus), bus=node.cdrom.bus) + + for interface in node.interfaces: + with node_xml.interface(type="network"): + node_xml.source(network=interface.network.id) + + if node.vnc: + node_xml.graphics(type='vnc', listen='0.0.0.0', autoport='yes') + + return str(node_xml) + + +class Libvirt: + def __init__(self, xml_builder = LibvirtXMLBuilder()): + self.xml_builder = xml_builder + self._init_capabilities() + + def node_exists(self, node_name): + return self._system("virsh dominfo '%s'" % node_name, expected_resultcodes=(0, 1)) == 0 + + def network_exists(self, network_name): + return self._system("virsh net-info '%s' 2>/dev/null" % network_name, expected_resultcodes=(0, 1)) == 0 + + def create_network(self, network): + if not hasattr(network, 'id') or network.id is None: + network.id = self._generate_network_id(network.name) + elif self.is_network_defined(network): + self._virsh("net-undefine '%s'", network.id) + + with tempfile.NamedTemporaryFile(delete=True) as xml_file: + network_xml = self.xml_builder.build_network_xml(network) + logger.debug("libvirt: Building network with following XML:\n%s" % network_xml) + xml_file.write(network_xml) + xml_file.flush() + self._virsh("net-define '%s'", xml_file.name) + + with os.popen("virsh net-dumpxml '%s'" % network.id) as f: + network_element = xml.parse_stream(f) + + network.bridge_name = network_element.find('bridge/@name') + network.mac_address = network_element.find('mac/@address') + + def delete_network(self, network): + if self.is_network_defined(network): + logger.debug("Network %s is defined. Undefining.") + self._virsh("net-undefine '%s'", network.id) + + + def start_network(self, network): + if not self.is_network_running(network): + logger.debug("Network %s is not running. Starting.") + self._virsh("net-start '%s'", network.id) + + def stop_network(self, network): + if self.is_network_running(network): + logger.debug("Network %s is running. Stopping.") + self._virsh("net-destroy '%s'", network.id) + + def _get_node_xml(self, node): + with os.popen("virsh dumpxml '%s'" % node.id) as f: + return xml.parse_stream(f) + + def create_node(self, node): + specs = filter(lambda spec: spec.arch == node.arch, self.specs) + if len(specs) == 0: + raise LibvirtException, "Can't create node %s: insufficient capabilities" % node.name + + specs.sort(key=spec_priority) + spec = specs[-1] + + if not hasattr(node, 'id') or node.id is None: + node.id = self._generate_node_id(node.name) + + with tempfile.NamedTemporaryFile(delete=True) as xml_file: + node_xml = self.xml_builder.build_node_xml(node, spec) + logger.debug("libvirt: Building node with following XML:\n%s" % node_xml) + xml_file.write(node_xml) + xml_file.flush() + self._virsh("define '%s'", xml_file.name) + + domain = self._get_node_xml(node) + + for interface_element in domain.find_all('devices/interface[@type="network"]'): + network_id = interface_element.find('source/@network') + + interface = find(lambda i: i.network.id == network_id, node.interfaces) + if interface is None: + continue + + interface.mac_address = interface_element.find('mac/@address') + + def delete_node(self, node): + if self.is_node_defined(node): + logger.debug("Node %s defined. Undefining." % node.id) + self._virsh("undefine '%s'", node.id) + + def start_node(self, node): + if not self.is_node_running(node): + logger.debug("Node %s is not running at the moment. Starting." % node.id) + self._virsh("start '%s'", node.id) + + if node.vnc: + domain = self._get_node_xml(node) + + port_text = domain.find('devices/graphics[@type="vnc"]/@port') + if port_text: node.vnc_port = int(port_text) + + + def stop_node(self, node): + if self.is_node_running(node): + logger.debug("Node %s is running at the moment. Stopping." % node.id) + self._virsh("destroy '%s'", node.id) + + + def reset_node(self, node): + self._virsh("reset '%s'", node.id) + + def reboot_node(self, node): + self._virsh("reboot '%s'", node.id) + + def shutdown_node(self, node): + self._virsh("stop '%s'", node.id) + + + def get_node_snapshots(self, node): + command = "virsh snapshot-list '%s'" % node.id + process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.wait() + if process.returncode != 0: + logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, process.stderr.read())) + else: + logger.debug("Command '%s' returned %d" % (command, process.returncode)) + + snapshot_ids = [] + for line in process.stdout.readlines()[2:]: + if line.strip() == '': continue + + snapshot_ids.append(line.split()[0]) + + return snapshot_ids + + def create_snapshot(self, node, name=None, description=None): + if not name: + name = str(int(time.time()*100)) + + with tempfile.NamedTemporaryFile(delete=True) as xml_file: + snapshot_xml = XMLBuilder('domainsnapshot') + snapshot_xml.name(name) + if description: + snapshot_xml.description(description) + + logger.debug("Building snapshot with following XML:\n%s" % str(snapshot_xml)) + xml_file.write(str(snapshot_xml)) + xml_file.flush() + + self._virsh("snapshot-create '%s' '%s'", node.id, xml_file.name) + + return name + + def revert_snapshot(self, node, snapshot_name=None): + if not snapshot_name: + snapshot_name = '--current' + self._virsh("snapshot-revert '%s' %s", node.id, snapshot_name) + + def delete_snapshot(self, node, snapshot_name=None): + if not snapshot_name: + snapshot_name = '--current' + self._virsh("snapshot-delete '%s' %s", node.id, snapshot_name) + + def send_keys_to_node(self, node, keys): + keys = scancodes.from_string(str(keys)) + for key_codes in keys: + if isinstance(key_codes[0], str): + if key_codes[0] == 'wait': + time.sleep(1) + + continue + + self._virsh("send-key '%s' %s", node.id, ' '.join(map(lambda x: str(x), key_codes))) + + def create_disk(self, disk): + if not disk.path: + f, disk.path = tempfile.mkstemp(prefix='disk-', suffix=(".%s" % disk.format)) + os.close(f) + + if disk.base_image: + self._system("qemu-img create -f '%(format)s' -b '%(backing_path)s' '%(path)s'" % {'format': disk.format, 'path': disk.path, 'backing_path': disk.base_image}) + else: + self._system("qemu-img create -f '%(format)s' '%(path)s' '%(size)s'" % {'format': disk.format, 'path': disk.path, 'size': disk.size}) + + + def delete_disk(self, disk): + if disk.path is None: return + + os.unlink(disk.path) + + def get_interface_addresses(self, interface): + command = "arp -an | awk '$4 == \"%(mac)s\" && $7 == \"%(interface)s\" {print substr($2, 2, length($2)-2)}'" % { 'mac': interface.mac_address, 'interface': interface.network.bridge_name} + with os.popen(command) as f: + return [ipaddr.IPv4Address(s) for s in f.read().split()] + + def _virsh(self, format, *args): + command = ("virsh " + format) % args + logger.debug("Running '%s'" % command) + process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.wait() + if process.returncode != 0: + logger.error("Command '%s' returned code %d:\n%s" % (command, process.returncode, process.stderr.read())) + raise LibvirtException, "Failed to execute command '%s'" % command + + def _init_capabilities(self): + with os.popen("virsh capabilities") as f: + capabilities = xml.parse_stream(f) + + self.specs = [] + + for guest in capabilities.find_all('guest'): + for arch in guest.find_all('arch'): + for domain in arch.find_all('domain'): + spec = DeploymentSpec() + spec.arch = arch['name'] + spec.os_type = guest.find('os_type/text()') + spec.hypervisor = domain['type'] + spec.emulator = (domain.find('emulator') or arch.find('emulator')).text + + self.specs.append(spec) + + def _generate_network_id(self, name='net'): + while True: + id = name + '-' + str(int(time.time()*100)) + if not self.network_exists(id): + return id + + def _generate_node_id(self, name='node'): + while True: + id = name + '-' + str(int(time.time()*100)) + if not self.node_exists(id): + return id + + + def is_node_defined(self, node): + return self._system2("virsh list --all | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0 + + def is_node_running(self, node): + return self._system2("virsh list | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0 + + def is_network_defined(self, network): + return self._system2("virsh net-list --all | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0 + + def is_network_running(self, network): + return self._system2("virsh net-list | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0 + + def _system2(self, command, expected_resultcodes=(0,)): + logger.debug("Running %s" % command) + + commands = [ i.strip() for i in re.split(ur'\|', command)] + serr = [] + + process = [] + process.append(subprocess.Popen(shlex.split(commands[0]), stdin=None, + stdout=subprocess.PIPE, stderr=subprocess.PIPE)) + for c in commands[1:]: + process.append(subprocess.Popen(shlex.split(c), stdin=process[-1].stdout, + stdout=subprocess.PIPE, stderr=subprocess.PIPE)) + + process[-1].wait() + + for p in process: + serr += [ err.strip() for err in p.stderr.readlines() ] + + returncode = process[-1].returncode + + if expected_resultcodes and not returncode in expected_resultcodes: + logger.error("Command '%s' returned %d, stderr: %s" % (command, returncode, '\n'.join(serr))) + else: + logger.debug("Command '%s' returned %d" % (command, returncode)) + + return returncode + + def _system(self, command, expected_resultcodes=(0,)): + logger.debug("Running '%s'" % command) + serr = [] + process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.wait() + serr += [ err.strip() for err in process.stderr.readlines() ] + + if expected_resultcodes and not process.returncode in expected_resultcodes: + logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, '\n'.join(serr))) + else: + logger.debug("Command '%s' returned %d" % (command, process.returncode)) + + return process.returncode + diff --git a/devops/devops/helpers.py b/devops/devops/helpers.py index bcb434cae..433f538fb 100644 --- a/devops/devops/helpers.py +++ b/devops/devops/helpers.py @@ -1,289 +1,289 @@ -import os -import os.path -import urllib -import stat -import socket -import time -import httplib -import xmlrpclib -import paramiko -import string -import random -from threading import Thread - -import BaseHTTPServer -from SimpleHTTPServer import SimpleHTTPRequestHandler -import posixpath - -import logging -from devops.error import DevopsError - -logger = logging.getLogger('devops.helpers') - -class TimeoutError(Exception): pass -class AuthenticationError(Exception): pass - -def get_free_port(): - ports = range(32000, 32100) - random.shuffle(ports) - for port in ports: - if not tcp_ping('localhost', port): - return port - raise DevopsError, "No free ports available" - -def icmp_ping(host, timeout=1): - "icmp_ping(host, timeout=1) - returns True if host is pingable; False - otherwise." - return os.system("ping -c 1 -W '%(timeout)d' '%(host)s' 1>/dev/null 2>&1" % { 'host': str(host), 'timeout': timeout}) == 0 - -def tcp_ping(host, port): - "tcp_ping(host, port) - returns True if TCP connection to specified host and port can be established; False - otherwise." - s = socket.socket() - try: - s.connect((str(host), int(port))) - except socket.error: - return False - s.close() - return True - -def wait(predicate, interval=5, timeout=None): - """ - wait(predicate, interval=5, timeout=None) - wait until predicate will become True. Returns number of seconds that is left or 0 if timeout is None. - Options: - interval - seconds between checks. - timeout - raise TimeoutError if predicate won't become True after this amount of seconds. 'None' disables timeout. - """ - start_time = time.time() - while not predicate(): - if timeout and start_time + timeout < time.time(): - raise TimeoutError, "Waiting timed out" - - seconds_to_sleep = interval - if timeout: - seconds_to_sleep = max(0, min(seconds_to_sleep, start_time + timeout - time.time())) - time.sleep(seconds_to_sleep) - - return timeout + start_time - time.time() if timeout else 0 - -def http(host='localhost', port=80, method='GET', url='/', waited_code=200): - try: - conn = httplib.HTTPConnection(str(host), int(port)) - conn.request(method, url) - res = conn.getresponse() - - if res.status == waited_code: - return True - return False - except: - return False - - -class KeyPolicy(paramiko.WarningPolicy): - def missing_host_key(self, client, hostname, key): - return - -class SSHClient(object): - class get_sudo(object): - def __init__(self, ssh): - self.ssh = ssh - - def __enter__(self): - self.ssh.sudo_mode = True - - def __exit__(self, type, value, traceback): - self.ssh.sudo_mode = False - - def __init__(self, host, port=22, username=None, password=None): - self.host = str(host) - self.port = int(port) - self.username = username - self.password = password - - self.sudo_mode = False - self.sudo = self.get_sudo(self) - - self.reconnect() - - def __del__(self): - self._sftp.close() - self._ssh.close() - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - pass - - def reconnect(self): - self._ssh = paramiko.SSHClient() - self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self._ssh.connect(self.host, port=self.port, username=self.username, password=self.password) - self._sftp = self._ssh.open_sftp() - - def execute(self, command): - logger.debug("Executing command: '%s'" % command.rstrip()) - chan = self._ssh.get_transport().open_session() - stdin = chan.makefile('wb') - stdout = chan.makefile('rb') - stderr = chan.makefile_stderr('rb') - cmd = "%s\n" % command - if self.sudo_mode: - cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') - chan.exec_command(cmd) - if stdout.channel.closed is False: - stdin.write('%s\n' % self.password) - stdin.flush() - result = { - 'stdout': [], - 'stderr': [], - 'exit_code': chan.recv_exit_status() - } - for line in stdout: - result['stdout'].append(line) - for line in stderr: - result['stderr'].append(line) - - chan.close() - - return result - - def mkdir(self, path): - if self.exists(path): - return - logger.debug("Creating directory: %s" % path) - self.execute("mkdir -p %s\n" % path) - - def rm_rf(self, path): - logger.debug("Removing directory: %s" % path) - self.execute("rm -rf %s" % path) - - def open(self, path, mode='r'): - return self._sftp.open(path, mode) - - def upload(self, source, target): - logger.debug("Copying '%s' -> '%s'" % (source, target)) - - if self.isdir(target): - target = posixpath.join(target, os.path.basename(source)) - - if not os.path.isdir(source): - self._sftp.put(source, target) - return - - for rootdir, subdirs, files in os.walk(source): - targetdir = os.path.normpath(os.path.join(target, os.path.relpath(rootdir, source))).replace("\\", "/") - - self.mkdir(targetdir) - - for entry in files: - local_path = os.path.join(rootdir, entry) - remote_path = posixpath.join(targetdir, entry) - if self.exists(remote_path): - self._sftp.unlink(remote_path) - self._sftp.put(local_path, remote_path) - - def exists(self, path): - try: - self._sftp.lstat(path) - return True - except IOError: - return False - - def isfile(self, path): - try: - attrs = self._sftp.lstat(path) - return attrs.st_mode & stat.S_IFREG != 0 - except IOError: - return False - - def isdir(self, path): - try: - attrs = self._sftp.lstat(path) - return attrs.st_mode & stat.S_IFDIR != 0 - except IOError: - return False - - -def ssh(*args, **kwargs): - return SSHClient(*args, **kwargs) - - - -class HttpServer: - class Handler(SimpleHTTPRequestHandler): - logger = logging.getLogger('devops.helpers.http_server') - - def __init__(self, docroot, *args, **kwargs): - self.docroot = docroot - SimpleHTTPRequestHandler.__init__(self, *args, **kwargs) - - # Suppress reverse DNS lookups to speed up processing - def address_string(self): - return self.client_address[0] - - # Handle docroot - def translate_path(self, path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = path.split('?',1)[0] - path = path.split('#',1)[0] - path = posixpath.normpath(urllib.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = self.docroot - for word in words: - drive, word = os.path.splitdrive(word) - head, word = os.path.split(word) - path = os.path.join(path, word) - return path - - def log_message(self, format, *args): - self.logger.info(format % args) - - def __init__(self, document_root): - self.port = get_free_port() - self.document_root = document_root - - def handler_factory(*args, **kwargs): - return HttpServer.Handler(document_root, *args, **kwargs) - - self._server = BaseHTTPServer.HTTPServer(('', self.port), handler_factory) - self._thread = Thread(target=self._server.serve_forever) - self._thread.daemon = True - - def start(self): - self._thread.start() - - def run(self): - self._thread.join() - - def stop(self): - self._server.shutdown() - self._thread.join() - -def http_server(document_root): - server = HttpServer(document_root) - server.start() - return server - - -def xmlrpctoken(uri, login, password): - server = xmlrpclib.Server(uri) - try: - return server.login(login, password) - except: - raise AuthenticationError, "Error occured while login process" - -def xmlrpcmethod(uri, method): - server = xmlrpclib.Server(uri) - try: - return getattr(server, method) - except: - raise AttributeError, "Error occured while getting server method" - - +import os +import os.path +import urllib +import stat +import socket +import time +import httplib +import xmlrpclib +import paramiko +import string +import random +from threading import Thread + +import BaseHTTPServer +from SimpleHTTPServer import SimpleHTTPRequestHandler +import posixpath + +import logging +from devops.error import DevopsError + +logger = logging.getLogger('devops.helpers') + +class TimeoutError(Exception): pass +class AuthenticationError(Exception): pass + +def get_free_port(): + ports = range(32000, 32100) + random.shuffle(ports) + for port in ports: + if not tcp_ping('localhost', port): + return port + raise DevopsError, "No free ports available" + +def icmp_ping(host, timeout=1): + "icmp_ping(host, timeout=1) - returns True if host is pingable; False - otherwise." + return os.system("ping -c 1 -W '%(timeout)d' '%(host)s' 1>/dev/null 2>&1" % { 'host': str(host), 'timeout': timeout}) == 0 + +def tcp_ping(host, port): + "tcp_ping(host, port) - returns True if TCP connection to specified host and port can be established; False - otherwise." + s = socket.socket() + try: + s.connect((str(host), int(port))) + except socket.error: + return False + s.close() + return True + +def wait(predicate, interval=5, timeout=None): + """ + wait(predicate, interval=5, timeout=None) - wait until predicate will become True. Returns number of seconds that is left or 0 if timeout is None. + Options: + interval - seconds between checks. + timeout - raise TimeoutError if predicate won't become True after this amount of seconds. 'None' disables timeout. + """ + start_time = time.time() + while not predicate(): + if timeout and start_time + timeout < time.time(): + raise TimeoutError, "Waiting timed out" + + seconds_to_sleep = interval + if timeout: + seconds_to_sleep = max(0, min(seconds_to_sleep, start_time + timeout - time.time())) + time.sleep(seconds_to_sleep) + + return timeout + start_time - time.time() if timeout else 0 + +def http(host='localhost', port=80, method='GET', url='/', waited_code=200): + try: + conn = httplib.HTTPConnection(str(host), int(port)) + conn.request(method, url) + res = conn.getresponse() + + if res.status == waited_code: + return True + return False + except: + return False + + +class KeyPolicy(paramiko.WarningPolicy): + def missing_host_key(self, client, hostname, key): + return + +class SSHClient(object): + class get_sudo(object): + def __init__(self, ssh): + self.ssh = ssh + + def __enter__(self): + self.ssh.sudo_mode = True + + def __exit__(self, type, value, traceback): + self.ssh.sudo_mode = False + + def __init__(self, host, port=22, username=None, password=None): + self.host = str(host) + self.port = int(port) + self.username = username + self.password = password + + self.sudo_mode = False + self.sudo = self.get_sudo(self) + + self.reconnect() + + def __del__(self): + self._sftp.close() + self._ssh.close() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + pass + + def reconnect(self): + self._ssh = paramiko.SSHClient() + self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self._ssh.connect(self.host, port=self.port, username=self.username, password=self.password) + self._sftp = self._ssh.open_sftp() + + def execute(self, command): + logger.debug("Executing command: '%s'" % command.rstrip()) + chan = self._ssh.get_transport().open_session() + stdin = chan.makefile('wb') + stdout = chan.makefile('rb') + stderr = chan.makefile_stderr('rb') + cmd = "%s\n" % command + if self.sudo_mode: + cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') + chan.exec_command(cmd) + if stdout.channel.closed is False: + stdin.write('%s\n' % self.password) + stdin.flush() + result = { + 'stdout': [], + 'stderr': [], + 'exit_code': chan.recv_exit_status() + } + for line in stdout: + result['stdout'].append(line) + for line in stderr: + result['stderr'].append(line) + + chan.close() + + return result + + def mkdir(self, path): + if self.exists(path): + return + logger.debug("Creating directory: %s" % path) + self.execute("mkdir -p %s\n" % path) + + def rm_rf(self, path): + logger.debug("Removing directory: %s" % path) + self.execute("rm -rf %s" % path) + + def open(self, path, mode='r'): + return self._sftp.open(path, mode) + + def upload(self, source, target): + logger.debug("Copying '%s' -> '%s'" % (source, target)) + + if self.isdir(target): + target = posixpath.join(target, os.path.basename(source)) + + if not os.path.isdir(source): + self._sftp.put(source, target) + return + + for rootdir, subdirs, files in os.walk(source): + targetdir = os.path.normpath(os.path.join(target, os.path.relpath(rootdir, source))).replace("\\", "/") + + self.mkdir(targetdir) + + for entry in files: + local_path = os.path.join(rootdir, entry) + remote_path = posixpath.join(targetdir, entry) + if self.exists(remote_path): + self._sftp.unlink(remote_path) + self._sftp.put(local_path, remote_path) + + def exists(self, path): + try: + self._sftp.lstat(path) + return True + except IOError: + return False + + def isfile(self, path): + try: + attrs = self._sftp.lstat(path) + return attrs.st_mode & stat.S_IFREG != 0 + except IOError: + return False + + def isdir(self, path): + try: + attrs = self._sftp.lstat(path) + return attrs.st_mode & stat.S_IFDIR != 0 + except IOError: + return False + + +def ssh(*args, **kwargs): + return SSHClient(*args, **kwargs) + + + +class HttpServer: + class Handler(SimpleHTTPRequestHandler): + logger = logging.getLogger('devops.helpers.http_server') + + def __init__(self, docroot, *args, **kwargs): + self.docroot = docroot + SimpleHTTPRequestHandler.__init__(self, *args, **kwargs) + + # Suppress reverse DNS lookups to speed up processing + def address_string(self): + return self.client_address[0] + + # Handle docroot + def translate_path(self, path): + """Translate a /-separated PATH to the local filename syntax. + + Components that mean special things to the local file system + (e.g. drive or directory names) are ignored. (XXX They should + probably be diagnosed.) + + """ + # abandon query parameters + path = path.split('?',1)[0] + path = path.split('#',1)[0] + path = posixpath.normpath(urllib.unquote(path)) + words = path.split('/') + words = filter(None, words) + path = self.docroot + for word in words: + drive, word = os.path.splitdrive(word) + head, word = os.path.split(word) + path = os.path.join(path, word) + return path + + def log_message(self, format, *args): + self.logger.info(format % args) + + def __init__(self, document_root): + self.port = get_free_port() + self.document_root = document_root + + def handler_factory(*args, **kwargs): + return HttpServer.Handler(document_root, *args, **kwargs) + + self._server = BaseHTTPServer.HTTPServer(('', self.port), handler_factory) + self._thread = Thread(target=self._server.serve_forever) + self._thread.daemon = True + + def start(self): + self._thread.start() + + def run(self): + self._thread.join() + + def stop(self): + self._server.shutdown() + self._thread.join() + +def http_server(document_root): + server = HttpServer(document_root) + server.start() + return server + + +def xmlrpctoken(uri, login, password): + server = xmlrpclib.Server(uri) + try: + return server.login(login, password) + except: + raise AuthenticationError, "Error occured while login process" + +def xmlrpcmethod(uri, method): + server = xmlrpclib.Server(uri) + try: + return getattr(server, method) + except: + raise AttributeError, "Error occured while getting server method" + + diff --git a/devops/devops/main.py b/devops/devops/main.py index d1ea6a026..ef11d90d1 100644 --- a/devops/devops/main.py +++ b/devops/devops/main.py @@ -1,39 +1,39 @@ -import logging -from error import DevopsError -from controller import Controller -from driver.libvirt import Libvirt -import yaml_config_loader - -__all__ = ['logger', 'getController', 'build', 'destroy', 'load', 'save'] - -logger = logging.getLogger('devops') - -class ControllerSingleton(Controller): - _instance = None - def __new__(cls, *args, **kwargs): - if not cls._instance: - cls._instance = super(ControllerSingleton, cls).__new__( - cls, *args, **kwargs) - return cls._instance - -def getController(): - return ControllerSingleton(Libvirt()) - -def build(environment): - getController().build_environment(environment) - -def destroy(environment): - getController().destroy_environment(environment) - -def load(source): - source = str(source).strip() - if source.find("\n") == -1: - if not source in getController().saved_environments: - raise DevopsError, "Environment '%s' does not exist" % source - return getController().load_environment(source) - - return yaml_config_loader.load(source) - -def save(environment): - getController().save_environment(environment) - +import logging +from error import DevopsError +from controller import Controller +from driver.libvirt import Libvirt +import yaml_config_loader + +__all__ = ['logger', 'getController', 'build', 'destroy', 'load', 'save'] + +logger = logging.getLogger('devops') + +class ControllerSingleton(Controller): + _instance = None + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(ControllerSingleton, cls).__new__( + cls, *args, **kwargs) + return cls._instance + +def getController(): + return ControllerSingleton(Libvirt()) + +def build(environment): + getController().build_environment(environment) + +def destroy(environment): + getController().destroy_environment(environment) + +def load(source): + source = str(source).strip() + if source.find("\n") == -1: + if not source in getController().saved_environments: + raise DevopsError, "Environment '%s' does not exist" % source + return getController().load_environment(source) + + return yaml_config_loader.load(source) + +def save(environment): + getController().save_environment(environment) + diff --git a/devops/devops/network.py b/devops/devops/network.py index 3eb59be16..580ef288b 100644 --- a/devops/devops/network.py +++ b/devops/devops/network.py @@ -1,44 +1,44 @@ -import ipaddr -from itertools import chain - -IPv4Address = ipaddr.IPv4Address -IPv4Network = ipaddr.IPv4Network - -class NetworkPoolException(Exception): pass - -class IpNetworksPool: - def __init__(self, net_addresses=None, prefix=24): - if not net_addresses: net_addresses = ['10.0.0.0/20'] - networks = [] - for address in net_addresses: - if not isinstance(address, IPv4Network): - address = IPv4Network(str(address)) - networks.append(address) - - self._available_networks = set(chain(*[net_address.iter_subnets(new_prefix=prefix) for net_address in networks])) - self._allocated_networks = set() - - def reserve(self, network): - for overlaping_network in filter(lambda n: n.overlaps(network), self._available_networks): - self._available_networks.remove(overlaping_network) - - def get(self): - "get() - allocates and returns network address" - x = self._available_networks.pop() - self._allocated_networks.add(x) - return x - - def put(self, network): - "put(net_address) - return network address to pool" - x = network - if x not in self._allocated_networks: - raise NetworkPoolException, "Network address '%s' wasn't previously allocated" % str(network) - - self._allocated_networks.remove(x) - self._available_networks.add(x) - - @property - def is_empty(self): - return len(self._available_networks) == 0 - - +import ipaddr +from itertools import chain + +IPv4Address = ipaddr.IPv4Address +IPv4Network = ipaddr.IPv4Network + +class NetworkPoolException(Exception): pass + +class IpNetworksPool: + def __init__(self, net_addresses=None, prefix=24): + if not net_addresses: net_addresses = ['10.0.0.0/20'] + networks = [] + for address in net_addresses: + if not isinstance(address, IPv4Network): + address = IPv4Network(str(address)) + networks.append(address) + + self._available_networks = set(chain(*[net_address.iter_subnets(new_prefix=prefix) for net_address in networks])) + self._allocated_networks = set() + + def reserve(self, network): + for overlaping_network in filter(lambda n: n.overlaps(network), self._available_networks): + self._available_networks.remove(overlaping_network) + + def get(self): + "get() - allocates and returns network address" + x = self._available_networks.pop() + self._allocated_networks.add(x) + return x + + def put(self, network): + "put(net_address) - return network address to pool" + x = network + if x not in self._allocated_networks: + raise NetworkPoolException, "Network address '%s' wasn't previously allocated" % str(network) + + self._allocated_networks.remove(x) + self._available_networks.add(x) + + @property + def is_empty(self): + return len(self._available_networks) == 0 + + diff --git a/devops/devops/yaml_config_loader.py b/devops/devops/yaml_config_loader.py index e50eff080..cec0e8ab4 100644 --- a/devops/devops/yaml_config_loader.py +++ b/devops/devops/yaml_config_loader.py @@ -1,158 +1,158 @@ -import re -import my_yaml -from model import Environment, Network, Node, Disk, Interface, Cdrom - -class ConfigError(Exception): pass - -def load(stream): - data = my_yaml.load(stream) - - if not data.has_key('nodes'): - raise ConfigError, "No nodes defined" - - name = 'default' - if data.has_key('name'): - name = data['name'] - environment = Environment(name) - - for network_data in (data.get('networks') or []): - parse_network(environment, network_data) - - for node_data in data['nodes']: - parse_node(environment, node_data) - - return environment - - -def dump(stream): - raise "Not implemented yet" - -def parse_network(environment, data): - if data.has_key('name'): - name = data['name'] - elif data.has_key('network'): - name = data['network'] - else: - raise ConfigError, "Unnamed network" - - network = Network(name) - - if data.has_key('dhcp_server'): - network.dhcp_server = data['dhcp_server'] - - for existing_network in environment.networks: - if existing_network.name == network.name: - raise ConfigError, "Network with given name already exists: %s" % network.name - - environment.networks.append(network) - - return network - -def parse_node(environment, data): - if data.has_key('name'): - name = data['name'] - elif data.has_key('node'): - name = data['node'] - else: - raise ConfigError, "Unnamed node" - - node = Node(name) - if data.has_key('cpu'): - node.cpu = data['cpu'] - if data.has_key('memory'): - node.memory = data['memory'] - - if data.has_key('vnc'): - node.vnc = data['vnc'] - - if data.has_key('cdrom'): - isopath = data['cdrom'] - if not isinstance(isopath, (str,)): - raise ConfigError, "It must be string containing path to iso image file" - - node.cdrom = Cdrom(isopath) - - - if data.has_key('disk'): - disks_data = data['disk'] - if type(disks_data) != list: - disks_data = (disks_data,) - - for disk_data in disks_data: - if type(disk_data) == str: - try: - size = parse_size(disk_data) - node.disks.append(Disk(size=size)) - except ValueError: - path = disk_data - node.disks.append(Disk(path=path)) - elif isinstance(disk_data, dict): - node.disks.append(Disk(**disk_data)) - else: - raise ConfigError, "Unknown disk config: %s" % str(disk_data) - - if data.has_key('networks'): - networks_data = data['networks'] - if type(networks_data) != list: - networks_data = (networks_data,) - - for network_data in networks_data: - if type(network_data) == str: - network = None - for n in environment.networks: - if n.name == network_data: - network = n - break - - # Inline networks - # if network is None: - # network = parse_network(environment, {'name': network_data}) - # self.networks.append(network) - - # TODO: add support for specifying additional network interface params (e.g. mac address) - - if network is None: - raise ConfigError, "Unknown network %s" % network_data - - node.interfaces.append(Interface(network)) - - if data.has_key('boot'): - boot_data = data['boot'] - if type(boot_data) != list: - boot_data = list(boot_data) - - for boot in boot_data: - if not boot in ('disk', 'network', 'cdrom'): - raise ConfigError, "Unknown boot option: %s" % boot - node.boot.append(boot) - else: - if len(node.disks) > 0: node.boot.append('disk') - if node.cdrom : node.boot.append('cdrom') - if len(node.interfaces) > 0: node.boot.append('network') - - for existing_node in environment.nodes: - if existing_node.name == node.name: - raise ConfigError, "Node with given name already exists: %s" % node.name - - environment.nodes.append(node) - - - -SIZE_RE = re.compile('^(\d+)\s*(|kb|k|mb|m|gb|g)$') - -def parse_size(s): - m = SIZE_RE.match(s.lower()) - if not m: - raise ValueError, "Invalid size format: %s" % s - - value = int(m.group(1)) - units = m.group(2) - if units in ['k', 'kb']: multiplier=1024 - elif units in ['m', 'mb']: multiplier=1024**2 - elif units in ['g', 'gb']: multiplier=1024**3 - elif units in ['t', 'tb']: multiplier=1024**4 - elif units == '': multiplier=1 - else: raise ValueError, "Invalid size format: %s" % units - - return value * multiplier - +import re +import my_yaml +from model import Environment, Network, Node, Disk, Interface, Cdrom + +class ConfigError(Exception): pass + +def load(stream): + data = my_yaml.load(stream) + + if not data.has_key('nodes'): + raise ConfigError, "No nodes defined" + + name = 'default' + if data.has_key('name'): + name = data['name'] + environment = Environment(name) + + for network_data in (data.get('networks') or []): + parse_network(environment, network_data) + + for node_data in data['nodes']: + parse_node(environment, node_data) + + return environment + + +def dump(stream): + raise "Not implemented yet" + +def parse_network(environment, data): + if data.has_key('name'): + name = data['name'] + elif data.has_key('network'): + name = data['network'] + else: + raise ConfigError, "Unnamed network" + + network = Network(name) + + if data.has_key('dhcp_server'): + network.dhcp_server = data['dhcp_server'] + + for existing_network in environment.networks: + if existing_network.name == network.name: + raise ConfigError, "Network with given name already exists: %s" % network.name + + environment.networks.append(network) + + return network + +def parse_node(environment, data): + if data.has_key('name'): + name = data['name'] + elif data.has_key('node'): + name = data['node'] + else: + raise ConfigError, "Unnamed node" + + node = Node(name) + if data.has_key('cpu'): + node.cpu = data['cpu'] + if data.has_key('memory'): + node.memory = data['memory'] + + if data.has_key('vnc'): + node.vnc = data['vnc'] + + if data.has_key('cdrom'): + isopath = data['cdrom'] + if not isinstance(isopath, (str,)): + raise ConfigError, "It must be string containing path to iso image file" + + node.cdrom = Cdrom(isopath) + + + if data.has_key('disk'): + disks_data = data['disk'] + if type(disks_data) != list: + disks_data = (disks_data,) + + for disk_data in disks_data: + if type(disk_data) == str: + try: + size = parse_size(disk_data) + node.disks.append(Disk(size=size)) + except ValueError: + path = disk_data + node.disks.append(Disk(path=path)) + elif isinstance(disk_data, dict): + node.disks.append(Disk(**disk_data)) + else: + raise ConfigError, "Unknown disk config: %s" % str(disk_data) + + if data.has_key('networks'): + networks_data = data['networks'] + if type(networks_data) != list: + networks_data = (networks_data,) + + for network_data in networks_data: + if type(network_data) == str: + network = None + for n in environment.networks: + if n.name == network_data: + network = n + break + + # Inline networks + # if network is None: + # network = parse_network(environment, {'name': network_data}) + # self.networks.append(network) + + # TODO: add support for specifying additional network interface params (e.g. mac address) + + if network is None: + raise ConfigError, "Unknown network %s" % network_data + + node.interfaces.append(Interface(network)) + + if data.has_key('boot'): + boot_data = data['boot'] + if type(boot_data) != list: + boot_data = list(boot_data) + + for boot in boot_data: + if not boot in ('disk', 'network', 'cdrom'): + raise ConfigError, "Unknown boot option: %s" % boot + node.boot.append(boot) + else: + if len(node.disks) > 0: node.boot.append('disk') + if node.cdrom : node.boot.append('cdrom') + if len(node.interfaces) > 0: node.boot.append('network') + + for existing_node in environment.nodes: + if existing_node.name == node.name: + raise ConfigError, "Node with given name already exists: %s" % node.name + + environment.nodes.append(node) + + + +SIZE_RE = re.compile('^(\d+)\s*(|kb|k|mb|m|gb|g)$') + +def parse_size(s): + m = SIZE_RE.match(s.lower()) + if not m: + raise ValueError, "Invalid size format: %s" % s + + value = int(m.group(1)) + units = m.group(2) + if units in ['k', 'kb']: multiplier=1024 + elif units in ['m', 'mb']: multiplier=1024**2 + elif units in ['g', 'gb']: multiplier=1024**3 + elif units in ['t', 'tb']: multiplier=1024**4 + elif units == '': multiplier=1 + else: raise ValueError, "Invalid size format: %s" % units + + return value * multiplier + diff --git a/devops/setup.py b/devops/setup.py index 33ce5c2a5..8e78c0c2e 100644 --- a/devops/setup.py +++ b/devops/setup.py @@ -1,12 +1,12 @@ -from distutils.core import setup - -setup( - name='devops', - version='0.1', - description='Library to aid creating and manipulating virtual environments', - author='Mirantis, Inc.', - author_email='product@mirantis.com', - packages=['devops', 'devops.driver'], - scripts=['bin/devops'], requires=['xmlbuilder', "ipaddr", "paramiko"] -) - +from distutils.core import setup + +setup( + name='devops', + version='0.1', + description='Library to aid creating and manipulating virtual environments', + author='Mirantis, Inc.', + author_email='product@mirantis.com', + packages=['devops', 'devops.driver'], + scripts=['bin/devops'], requires=['xmlbuilder', "ipaddr", "paramiko"] +) + diff --git a/test/helpers.py b/test/helpers.py index 268782083..f4e5c7957 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -1,134 +1,134 @@ -import os -import urllib2 -import logging -from unittest import TestCase -import paramiko -import posixpath - -logger = logging.getLogger('helpers') - -""" -Integration test helpers -""" -class HTTPClient(object): - def __init__(self): - self.opener = urllib2.build_opener(urllib2.HTTPHandler) - - def get(self, url, log=False): - req = urllib2.Request(url) - return self._open(req, log) - - def post(self, url, data="{}", content_type="application/json", log=False): - req = urllib2.Request(url, data=data) - req.add_header('Content-Type', content_type) - return self._open(req, log) - - def put(self, url, data="{}", content_type="application/json", log=False): - req = urllib2.Request(url, data=data) - req.add_header('Content-Type', content_type) - req.get_method = lambda: 'PUT' - return self._open(req, log) - - def _open(self, req, log): - try: - resp = self.opener.open(req) - content = resp.read() - except urllib2.HTTPError, error: - content = ": ".join([str(error.code), error.read()]) - if log: - logger.debug(content) - return content - - -class SSHClient(object): - class get_sudo(object): - def __init__(self, client): - self.client = client - - def __enter__(self): - self.client.sudo_mode = True - - def __exit__(self, type, value, traceback): - self.client.sudo_mode = False - - def __init__(self): - self.channel = None - self.sudo_mode = False - self.sudo = self.get_sudo(self) - self.established = False - - def connect_ssh(self, host, username, password): - if not self.established: - self.ssh_client = paramiko.SSHClient() - self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.host = host - self.username = username - self.password = password - self.ssh_client.connect(host, username=username, password=password) - self.sftp_client = self.ssh_client.open_sftp() - self.established = True - - - def execute(self, command): - logger.debug("Executing command: '%s'" % command.rstrip()) - chan = self.ssh_client.get_transport().open_session() - stdin = chan.makefile('wb') - stdout = chan.makefile('rb') - stderr = chan.makefile_stderr('rb') - cmd = "%s\n" % command - if self.sudo_mode: - cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') - chan.exec_command(cmd) - if stdout.channel.closed is False: - stdin.write('%s\n' % self.password) - stdin.flush() - result = { - 'stdout': [], - 'stderr': [], - 'exit_status': chan.recv_exit_status() - } - for line in stdout: - result['stdout'].append(line) - for line in stderr: - result['stderr'].append(line) - - return result - - def mkdir(self, path): - logger.debug("Creating directory: %s" % path) - return self.execute("mkdir %s\n" % path) - - def rmdir(self, path): - logger.debug("Removing directory: %s" % path) - return self.execute("rm -rf %s" % path) - - def open(self, path, mode='r'): - return self.sftp_client.open(path, mode) - - def scp(self, frm, to): - logger.debug("Copying file: %s -> %s" % (frm, to)) - self.sftp_client.put(frm, to) - - def scp_d(self, frm, to): - logger.debug("Copying directory: %s -> %s" % (frm, to)) - remote_root = posixpath.join( - to, - os.path.basename(frm) - ) - for root, dirs, fls in os.walk(frm): - rel = os.path.relpath(root, frm).replace('\\','/') - if rel == ".": - curdir = remote_root - else: - curdir = posixpath.join(remote_root, rel) - self.mkdir(curdir) - for fl in fls: - self.scp( - os.path.join(root, fl), - posixpath.join(curdir, fl) - ) - - def disconnect(self): - self.sftp_client.close() - self.ssh_client.close() - self.established = False +import os +import urllib2 +import logging +from unittest import TestCase +import paramiko +import posixpath + +logger = logging.getLogger('helpers') + +""" +Integration test helpers +""" +class HTTPClient(object): + def __init__(self): + self.opener = urllib2.build_opener(urllib2.HTTPHandler) + + def get(self, url, log=False): + req = urllib2.Request(url) + return self._open(req, log) + + def post(self, url, data="{}", content_type="application/json", log=False): + req = urllib2.Request(url, data=data) + req.add_header('Content-Type', content_type) + return self._open(req, log) + + def put(self, url, data="{}", content_type="application/json", log=False): + req = urllib2.Request(url, data=data) + req.add_header('Content-Type', content_type) + req.get_method = lambda: 'PUT' + return self._open(req, log) + + def _open(self, req, log): + try: + resp = self.opener.open(req) + content = resp.read() + except urllib2.HTTPError, error: + content = ": ".join([str(error.code), error.read()]) + if log: + logger.debug(content) + return content + + +class SSHClient(object): + class get_sudo(object): + def __init__(self, client): + self.client = client + + def __enter__(self): + self.client.sudo_mode = True + + def __exit__(self, type, value, traceback): + self.client.sudo_mode = False + + def __init__(self): + self.channel = None + self.sudo_mode = False + self.sudo = self.get_sudo(self) + self.established = False + + def connect_ssh(self, host, username, password): + if not self.established: + self.ssh_client = paramiko.SSHClient() + self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.host = host + self.username = username + self.password = password + self.ssh_client.connect(host, username=username, password=password) + self.sftp_client = self.ssh_client.open_sftp() + self.established = True + + + def execute(self, command): + logger.debug("Executing command: '%s'" % command.rstrip()) + chan = self.ssh_client.get_transport().open_session() + stdin = chan.makefile('wb') + stdout = chan.makefile('rb') + stderr = chan.makefile_stderr('rb') + cmd = "%s\n" % command + if self.sudo_mode: + cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') + chan.exec_command(cmd) + if stdout.channel.closed is False: + stdin.write('%s\n' % self.password) + stdin.flush() + result = { + 'stdout': [], + 'stderr': [], + 'exit_status': chan.recv_exit_status() + } + for line in stdout: + result['stdout'].append(line) + for line in stderr: + result['stderr'].append(line) + + return result + + def mkdir(self, path): + logger.debug("Creating directory: %s" % path) + return self.execute("mkdir %s\n" % path) + + def rmdir(self, path): + logger.debug("Removing directory: %s" % path) + return self.execute("rm -rf %s" % path) + + def open(self, path, mode='r'): + return self.sftp_client.open(path, mode) + + def scp(self, frm, to): + logger.debug("Copying file: %s -> %s" % (frm, to)) + self.sftp_client.put(frm, to) + + def scp_d(self, frm, to): + logger.debug("Copying directory: %s -> %s" % (frm, to)) + remote_root = posixpath.join( + to, + os.path.basename(frm) + ) + for root, dirs, fls in os.walk(frm): + rel = os.path.relpath(root, frm).replace('\\','/') + if rel == ".": + curdir = remote_root + else: + curdir = posixpath.join(remote_root, rel) + self.mkdir(curdir) + for fl in fls: + self.scp( + os.path.join(root, fl), + posixpath.join(curdir, fl) + ) + + def disconnect(self): + self.sftp_client.close() + self.ssh_client.close() + self.established = False diff --git a/test/integration/__init__.py b/test/integration/__init__.py index 8c606ac9c..803f695dd 100644 --- a/test/integration/__init__.py +++ b/test/integration/__init__.py @@ -1,159 +1,159 @@ -import time, os - -from devops.model import Environment, Network, Node, Disk, Cdrom, Interface -from devops.controller import Controller -from devops.driver.libvirt import Libvirt -from devops.helpers import tcp_ping, wait, TimeoutError -import traceback - -import logging -import devops - -logger = logging.getLogger('integration') - -class Ci(object): - hostname = 'nailgun' - domain = 'mirantis.com' - installation_timeout = 1800 - chef_timeout = 600 - - def __init__(self, cache_file=None, iso=None): - self.environment_cache_file = cache_file - self.iso = iso - self.environment = None - if self.environment_cache_file and os.path.exists(self.environment_cache_file): - logger.info("Loading existing integration environment...") - with file(self.environment_cache_file) as f: - environment_id = f.read() - try: - self.environment = devops.load(environment_id) - logger.info("Successfully loaded existing environment") - except Exception, e: - logger.error("Failed to load existing integration environment: " + str(e) + "\n" + traceback.format_exc()) - pass - - def setup_environment(self): - if self.environment: - return True - - if not self.iso: - logger.critical("ISO path missing while trying to build integration environment") - return False - - logger.info("Building integration environment") - - try: - environment = Environment('integration') - - network = Network('default') - environment.networks.append(network) - - node = Node('admin') - node.memory = 2048 - node.vnc = True - node.disks.append(Disk(size=30*1024**3)) - node.interfaces.append(Interface(network)) - node.cdrom = Cdrom(isopath=self.iso) - node.boot = ['disk', 'cdrom'] - environment.nodes.append(node) - - node2 = Node('slave') - node2.memory = 2048 - node2.vnc = True - node2.disks.append(Disk(size=30*1024**3)) - node2.interfaces.append(Interface(network)) - node2.boot = ['network'] - environment.nodes.append(node2) - - devops.build(environment) - except Exception, e: - logger.error("Failed to build environment: %s\n%s" % (str(e), traceback.format_exc())) - return False - - self.environment = environment - - try: - node.interfaces[0].ip_addresses = network.ip_addresses[2] - - logger.info("Starting admin node") - node.start() - - logger.info("Waiting admin node installation software to boot") - # todo await - time.sleep(10) - - logger.info("Executing admin node software installation") - node.send_keys(""" - -/install/vmlinuz initrd=/install/initrd.gz - priority=critical - locale=en_US - file=/cdrom/preseed/manual.seed - vga=788 - netcfg/get_ipaddress=%(ip)s - netcfg/get_netmask=%(mask)s - netcfg/get_gateway=%(gw)s - netcfg/get_nameservers=%(gw)s - netcfg/confirm_static=true - netcfg/get_hostname=%(hostname)s - netcfg/get_domai=%(domain)s - -""" % { 'ip': node.ip_address, - 'mask': network.ip_addresses.netmask, - 'gw': network.ip_addresses[1], - 'hostname': self.hostname, - 'domain': self.domain}) - - logger.info("Waiting for completion of admin node software installation") - wait(lambda: tcp_ping(node.ip_address, 22), timeout=self.installation_timeout) - - logger.info("Got SSH access to admin node, waiting for ports 80 and 8000 to open") - wait(lambda: tcp_ping(node.ip_address, 80) and tcp_ping(node.ip_address, 8000), timeout=self.chef_timeout) - - logger.info("Admin node software is installed and ready for use") - - devops.save(self.environment) - - try: - os.makedirs(os.path.dirname(self.environment_cache_file)) - except OSError as e: - logger.warning("Error occured while creating directory: %s", os.path.dirname(self.environment_cache_file)) - - with file(self.environment_cache_file, 'w') as f: - f.write(self.environment.id) - - logger.info("Environment has been saved") - except Exception, e: - devops.save(self.environment) - - cache_file = self.environment_cache_file + '.candidate' - try: - os.makedirs(os.path.dirname(cache_file)) - except OSError: - logger.warning("Exception occured while making directory: %s" % os.path.dirname(cache_file)) - with file(cache_file, 'w') as f: - f.write(self.environment.id) - logger.error("Failed to build environment. Candidate environment cache file is %s" % cache_file) - return False - - return True - - def destroy_environment(self): - if self.environment: - devops.destroy(self.environment) - - if self.environment_cache_file and os.path.exists(self.environment_cache_file): - os.remove(self.environment_cache_file) - - return True - -ci = None - -def setUp(): - if not ci.setup_environment(): - raise Exception, "Failed to setup integration environment" - -def tearDown(): - if not ci.environment_cache_file: - ci.destroy_environment() - +import time, os + +from devops.model import Environment, Network, Node, Disk, Cdrom, Interface +from devops.controller import Controller +from devops.driver.libvirt import Libvirt +from devops.helpers import tcp_ping, wait, TimeoutError +import traceback + +import logging +import devops + +logger = logging.getLogger('integration') + +class Ci(object): + hostname = 'nailgun' + domain = 'mirantis.com' + installation_timeout = 1800 + chef_timeout = 600 + + def __init__(self, cache_file=None, iso=None): + self.environment_cache_file = cache_file + self.iso = iso + self.environment = None + if self.environment_cache_file and os.path.exists(self.environment_cache_file): + logger.info("Loading existing integration environment...") + with file(self.environment_cache_file) as f: + environment_id = f.read() + try: + self.environment = devops.load(environment_id) + logger.info("Successfully loaded existing environment") + except Exception, e: + logger.error("Failed to load existing integration environment: " + str(e) + "\n" + traceback.format_exc()) + pass + + def setup_environment(self): + if self.environment: + return True + + if not self.iso: + logger.critical("ISO path missing while trying to build integration environment") + return False + + logger.info("Building integration environment") + + try: + environment = Environment('integration') + + network = Network('default') + environment.networks.append(network) + + node = Node('admin') + node.memory = 2048 + node.vnc = True + node.disks.append(Disk(size=30*1024**3)) + node.interfaces.append(Interface(network)) + node.cdrom = Cdrom(isopath=self.iso) + node.boot = ['disk', 'cdrom'] + environment.nodes.append(node) + + node2 = Node('slave') + node2.memory = 2048 + node2.vnc = True + node2.disks.append(Disk(size=30*1024**3)) + node2.interfaces.append(Interface(network)) + node2.boot = ['network'] + environment.nodes.append(node2) + + devops.build(environment) + except Exception, e: + logger.error("Failed to build environment: %s\n%s" % (str(e), traceback.format_exc())) + return False + + self.environment = environment + + try: + node.interfaces[0].ip_addresses = network.ip_addresses[2] + + logger.info("Starting admin node") + node.start() + + logger.info("Waiting admin node installation software to boot") + # todo await + time.sleep(10) + + logger.info("Executing admin node software installation") + node.send_keys(""" + +/install/vmlinuz initrd=/install/initrd.gz + priority=critical + locale=en_US + file=/cdrom/preseed/manual.seed + vga=788 + netcfg/get_ipaddress=%(ip)s + netcfg/get_netmask=%(mask)s + netcfg/get_gateway=%(gw)s + netcfg/get_nameservers=%(gw)s + netcfg/confirm_static=true + netcfg/get_hostname=%(hostname)s + netcfg/get_domai=%(domain)s + +""" % { 'ip': node.ip_address, + 'mask': network.ip_addresses.netmask, + 'gw': network.ip_addresses[1], + 'hostname': self.hostname, + 'domain': self.domain}) + + logger.info("Waiting for completion of admin node software installation") + wait(lambda: tcp_ping(node.ip_address, 22), timeout=self.installation_timeout) + + logger.info("Got SSH access to admin node, waiting for ports 80 and 8000 to open") + wait(lambda: tcp_ping(node.ip_address, 80) and tcp_ping(node.ip_address, 8000), timeout=self.chef_timeout) + + logger.info("Admin node software is installed and ready for use") + + devops.save(self.environment) + + try: + os.makedirs(os.path.dirname(self.environment_cache_file)) + except OSError as e: + logger.warning("Error occured while creating directory: %s", os.path.dirname(self.environment_cache_file)) + + with file(self.environment_cache_file, 'w') as f: + f.write(self.environment.id) + + logger.info("Environment has been saved") + except Exception, e: + devops.save(self.environment) + + cache_file = self.environment_cache_file + '.candidate' + try: + os.makedirs(os.path.dirname(cache_file)) + except OSError: + logger.warning("Exception occured while making directory: %s" % os.path.dirname(cache_file)) + with file(cache_file, 'w') as f: + f.write(self.environment.id) + logger.error("Failed to build environment. Candidate environment cache file is %s" % cache_file) + return False + + return True + + def destroy_environment(self): + if self.environment: + devops.destroy(self.environment) + + if self.environment_cache_file and os.path.exists(self.environment_cache_file): + os.remove(self.environment_cache_file) + + return True + +ci = None + +def setUp(): + if not ci.setup_environment(): + raise Exception, "Failed to setup integration environment" + +def tearDown(): + if not ci.environment_cache_file: + ci.destroy_environment() + diff --git a/test/integration/base.py b/test/integration/base.py index 5e958bc6b..a0523780d 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -1,22 +1,22 @@ -from unittest.case import TestCase -from integration import ci -from helpers import HTTPClient -import logging -logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG) - -class Base(TestCase): - - client = HTTPClient() - - def get_admin_node_ip(self): - if ci is not None: - return ci.environment.node['admin'].ip_address - else: - return "10.20.0.2" - - def get_id_by_mac(self, mac_address): - return mac_address.replace(":", "").upper() - -# def getSlaveNodeIp(self, getIdByMac): - - +from unittest.case import TestCase +from integration import ci +from helpers import HTTPClient +import logging +logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG) + +class Base(TestCase): + + client = HTTPClient() + + def get_admin_node_ip(self): + if ci is not None: + return ci.environment.node['admin'].ip_address + else: + return "10.20.0.2" + + def get_id_by_mac(self, mac_address): + return mac_address.replace(":", "").upper() + +# def getSlaveNodeIp(self, getIdByMac): + + diff --git a/test/integration/test_cobbler.py b/test/integration/test_cobbler.py index a03715e66..d6308f2ab 100644 --- a/test/integration/test_cobbler.py +++ b/test/integration/test_cobbler.py @@ -1,47 +1,47 @@ -import logging -import xmlrpclib -from time import sleep - - - -from devops.helpers import wait, tcp_ping, http, ssh -from integration.base import Base -from helpers import SSHClient - - - -class TestCobbler(Base): - def __init__(self, *args, **kwargs): - super(TestCobbler, self).__init__(*args, **kwargs) - self.remote = SSHClient() - self.logpath = "/var/log/chef/solo.log" - self.str_success = "Report handlers complete" - - - def setUp(self): - self.ip = self.get_admin_node_ip() - - - def tearDown(self): - pass - - - def test_cobbler_alive(self): - logging.info("Waiting for handlers to complete") - - self.remote.connect_ssh(str(self.ip), "ubuntu", "r00tme") - count = 0 - while True: - res = self.remote.execute("grep '%s' '%s'" % (self.str_success, self.logpath)) - count += 1 - if not res['exit_status']: - break - sleep(5) - if count == 200: - raise Exception("Chef handlers failed to complete") - self.remote.disconnect() - - wait(lambda: http(host=self.ip, url='/cobbler_api', waited_code=501), timeout=60) - server = xmlrpclib.Server('http://%s/cobbler_api' % self.ip) - token = server.login('cobbler', 'cobbler') - assert server.ping() == True +import logging +import xmlrpclib +from time import sleep + + + +from devops.helpers import wait, tcp_ping, http, ssh +from integration.base import Base +from helpers import SSHClient + + + +class TestCobbler(Base): + def __init__(self, *args, **kwargs): + super(TestCobbler, self).__init__(*args, **kwargs) + self.remote = SSHClient() + self.logpath = "/var/log/chef/solo.log" + self.str_success = "Report handlers complete" + + + def setUp(self): + self.ip = self.get_admin_node_ip() + + + def tearDown(self): + pass + + + def test_cobbler_alive(self): + logging.info("Waiting for handlers to complete") + + self.remote.connect_ssh(str(self.ip), "ubuntu", "r00tme") + count = 0 + while True: + res = self.remote.execute("grep '%s' '%s'" % (self.str_success, self.logpath)) + count += 1 + if not res['exit_status']: + break + sleep(5) + if count == 200: + raise Exception("Chef handlers failed to complete") + self.remote.disconnect() + + wait(lambda: http(host=self.ip, url='/cobbler_api', waited_code=501), timeout=60) + server = xmlrpclib.Server('http://%s/cobbler_api' % self.ip) + token = server.login('cobbler', 'cobbler') + assert server.ping() == True diff --git a/test/integration/test_node.py b/test/integration/test_node.py index 7bc486767..a4ea4c6ee 100644 --- a/test/integration/test_node.py +++ b/test/integration/test_node.py @@ -1,280 +1,280 @@ -import shlex -import os -import sys -import logging -import time -import json -import urllib2 -import pprint -from unittest import TestCase -from subprocess import Popen, PIPE -#import posixpath -import paramiko -import posixpath -from devops.helpers import wait, tcp_ping, http -from integration import ci - -from integration.base import Base -from helpers import HTTPClient, SSHClient -from root import root - -logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG) - -AGENT_PATH = root("bin", "agent") -DEPLOY_PATH = root("bin", "deploy") -COOKBOOKS_PATH = root("cooks", "cookbooks") -SAMPLE_PATH = root("scripts", "ci") -SAMPLE_REMOTE_PATH = "/home/ubuntu" - - -class StillPendingException(Exception): - pass - - -class TestNode(Base): - def __init__(self, *args, **kwargs): - super(TestNode, self).__init__(*args, **kwargs) - self.remote = SSHClient() - self.admin_host = None - self.admin_user = "ubuntu" - self.admin_passwd = "r00tme" - self.slave_host = None - self.slave_user = "root" - self.slave_passwd = "r00tme" - self.release_id = None - - def setUp(self): - self.ip = self.get_admin_node_ip() - self.admin_host = self.ip - cookbook_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-cook") - mysql_remote_path = posixpath.join(COOKBOOKS_PATH, "mysql") - release_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-release.json") - self.remote.connect_ssh(self.admin_host, self.admin_user, self.admin_passwd) - self.remote.rmdir(cookbook_remote_path) - self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "cookbooks")) - self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "solo")) - self.remote.scp( - os.path.join(SAMPLE_PATH, "sample-release.json"), - release_remote_path - ) - self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo")) - self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo/config")) - self.remote.scp_d( - os.path.join(SAMPLE_PATH, "sample-cook"), - SAMPLE_REMOTE_PATH - ) - self.remote.scp_d( - COOKBOOKS_PATH, - SAMPLE_REMOTE_PATH - ) - - attempts = 0 - while True: - releases = json.loads(self.client.get( - "http://%s:8000/api/releases/" % self.admin_host - )) - - for r in releases: - logging.debug("Found release name: %s" % r["name"]) - if r["name"] == "Sample release": - logging.debug("Sample release id: %s" % r["id"]) - self.release_id = r["id"] - break - - if self.release_id: - break - - if attempts >= 1: - raise Exception("Release is not found") - - logging.error("Sample release is not found. Trying to upload") - with self.remote.sudo: - cmd = "/opt/nailgun/bin/create_release -f %s" % \ - release_remote_path - logging.info("Launching command: %s" % cmd) - res = self.remote.execute(cmd) - if res['exit_status'] != 0: - self.remote.disconnect() - raise Exception("Command failed: %s" % str(res)) - attempts += 1 - -# todo install_cookbook always return 0 - commands = [ - "/opt/nailgun/bin/install_cookbook %s" % cookbook_remote_path - ] - with self.remote.sudo: - for cmd in commands: - logging.info("Launching command: %s" % cmd) - res = self.remote.execute(cmd) - logging.debug("Command result: %s" % pprint.pformat(res)) - if res['exit_status']: - self.remote.disconnect() - raise Exception("Command failed: %s" % str(res)) - - self.remote.disconnect() - - def test_node_deploy(self): - try: - self.get_slave_id() - except : - pass - timer = time.time() - timeout = 600 - while True: - node = self.get_slave_node(self.get_slave_id()) - if node is not None: - logging.info("Node found") - self.slave_host = node["ip"] - break - else: - logging.info("Node not found") - if (time.time() - timer) > timeout: - raise Exception("Slave node agent failed to execute!") - time.sleep(15) - logging.info("Waiting for slave agent to run...") - - try: - cluster = json.loads(self.client.get( - "http://%s:8000/api/clusters/1" % self.admin_host - )) - except ValueError: - logging.info("No clusters found - creating test cluster...") - cluster = self.client.post( - "http://%s:8000/api/clusters" % self.admin_host, - data='{ "name": "MyOwnPrivateCluster", "release": %s }' % \ - self.release_id, log=True - ) - cluster = json.loads(cluster) - - resp = json.loads(self.client.put( - "http://%s:8000/api/clusters/1" % self.admin_host, - data='{ "nodes": ["%s"] }' % self.slave_id - )) - - cluster = json.loads(self.client.get( - "http://%s:8000/api/clusters/1" % self.admin_host - )) - if not len(cluster["nodes"]): - raise ValueError("Failed to add node into cluster") - - roles_uploaded = json.loads(self.client.get( - "http://%s:8000/api/roles?release_id=%s" % \ - (self.admin_host, self.release_id) - )) - - """ - FIXME - WILL BE CHANGED WHEN RENDERING WILL BE REWRITTEN - """ - roles_ids = [ - role["id"] for role in roles_uploaded - ] - - resp = json.loads(self.client.put( - "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id), - data='{ "new_roles": %s, "redeployment_needed": true }' % str(roles_ids) - )) - if not len(resp["new_roles"]): - raise ValueError("Failed to assign roles to node") - - if node["status"] == "discover": - logging.info("Node booted with bootstrap image.") - elif node["status"] == "ready": - logging.info("Node already installed.") - self._slave_delete_test_file() - - logging.info("Provisioning...") - task = json.loads(self.client.put( - "http://%s:8000/api/clusters/1/changes/" % self.admin_host, - log=True - )) - task_id = task['task_id'] - logging.info("Task created: %s" % task_id) - logging.info("Waiting for completion of slave node software installation") - timer = time.time() - timeout = 1800 - while True: - try: - task = self.client.get( - "http://%s:8000/api/tasks/%s/" % (self.admin_host, task_id) - ) - logging.info(str(task)) - task = json.loads(task) - if not task['ready']: - raise StillPendingException("Task %s is still pending") - if task.get('error'): - raise Exception( - "Task %s failed!\n %s" % - (task['task_id'], str(task)), - ) - break - except StillPendingException: - if (time.time() - timer) > timeout: - raise Exception("Task pending timeout!") - time.sleep(30) - - node = json.loads(self.client.get( - "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id) - )) - self.slave_host = node["ip"] - - logging.info("Waiting for SSH access on %s" % self.slave_host) - wait(lambda: tcp_ping(self.slave_host, 22), timeout=1800) - self.remote.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd) - - # check if recipes executed - ret = self.remote.execute("test -f /tmp/chef_success") - if ret['exit_status']: - raise Exception("Recipes failed to execute!") - - # check mysql running - #db = MySQLdb.connect(passwd="test", user="root", host=self.slave_host) - #print db - - # check recipes execution order - ret = self.remote.execute("cat /tmp/chef_success") - if [out.strip() for out in ret['stdout']] != ['monitor', 'default', 'compute']: - raise Exception("Recipes executed in a wrong order: %s!" \ - % str(ret['stdout'])) - - # chech node status - node = json.loads(self.client.get( - "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id) - )) - self.assertEqual(node["status"], "ready") - self.remote.disconnect() - - def _slave_delete_test_file(self): - logging.info("Deleting test file...") - slave_client = SSHClient() - slave_client.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd) - res = slave_client.execute("rm -rf /tmp/chef_success") - slave_client.disconnect() - -# create node with predefined mac address - def get_slave_id(self): - if hasattr(self,"slave_id"): return self.slave_id - if ci is not None: - slave = ci.environment.node['slave'] - slave_id = self.get_id_by_mac(slave.interfaces[0].mac_address) - logging.info("Starting slave node") - slave.start() - logging.info("Nailgun IP: %s" % self.admin_host) - else: - response = self.client.get( - "http://%s:8000/api/nodes" % self.admin_host - ) - last_node = json.loads(response)[-1] - slave_id = self.get_id_by_mac(last_node['mac']) - self.slave_id = slave_id - - return slave_id - - def get_slave_node(self, slave_id): - response = self.client.get( - "http://%s:8000/api/nodes/%s" % (self.admin_host, slave_id) - ) - - if response.startswith("404"): - return None - return json.loads(response) +import shlex +import os +import sys +import logging +import time +import json +import urllib2 +import pprint +from unittest import TestCase +from subprocess import Popen, PIPE +#import posixpath +import paramiko +import posixpath +from devops.helpers import wait, tcp_ping, http +from integration import ci + +from integration.base import Base +from helpers import HTTPClient, SSHClient +from root import root + +logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG) + +AGENT_PATH = root("bin", "agent") +DEPLOY_PATH = root("bin", "deploy") +COOKBOOKS_PATH = root("cooks", "cookbooks") +SAMPLE_PATH = root("scripts", "ci") +SAMPLE_REMOTE_PATH = "/home/ubuntu" + + +class StillPendingException(Exception): + pass + + +class TestNode(Base): + def __init__(self, *args, **kwargs): + super(TestNode, self).__init__(*args, **kwargs) + self.remote = SSHClient() + self.admin_host = None + self.admin_user = "ubuntu" + self.admin_passwd = "r00tme" + self.slave_host = None + self.slave_user = "root" + self.slave_passwd = "r00tme" + self.release_id = None + + def setUp(self): + self.ip = self.get_admin_node_ip() + self.admin_host = self.ip + cookbook_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-cook") + mysql_remote_path = posixpath.join(COOKBOOKS_PATH, "mysql") + release_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-release.json") + self.remote.connect_ssh(self.admin_host, self.admin_user, self.admin_passwd) + self.remote.rmdir(cookbook_remote_path) + self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "cookbooks")) + self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "solo")) + self.remote.scp( + os.path.join(SAMPLE_PATH, "sample-release.json"), + release_remote_path + ) + self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo")) + self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo/config")) + self.remote.scp_d( + os.path.join(SAMPLE_PATH, "sample-cook"), + SAMPLE_REMOTE_PATH + ) + self.remote.scp_d( + COOKBOOKS_PATH, + SAMPLE_REMOTE_PATH + ) + + attempts = 0 + while True: + releases = json.loads(self.client.get( + "http://%s:8000/api/releases/" % self.admin_host + )) + + for r in releases: + logging.debug("Found release name: %s" % r["name"]) + if r["name"] == "Sample release": + logging.debug("Sample release id: %s" % r["id"]) + self.release_id = r["id"] + break + + if self.release_id: + break + + if attempts >= 1: + raise Exception("Release is not found") + + logging.error("Sample release is not found. Trying to upload") + with self.remote.sudo: + cmd = "/opt/nailgun/bin/create_release -f %s" % \ + release_remote_path + logging.info("Launching command: %s" % cmd) + res = self.remote.execute(cmd) + if res['exit_status'] != 0: + self.remote.disconnect() + raise Exception("Command failed: %s" % str(res)) + attempts += 1 + +# todo install_cookbook always return 0 + commands = [ + "/opt/nailgun/bin/install_cookbook %s" % cookbook_remote_path + ] + with self.remote.sudo: + for cmd in commands: + logging.info("Launching command: %s" % cmd) + res = self.remote.execute(cmd) + logging.debug("Command result: %s" % pprint.pformat(res)) + if res['exit_status']: + self.remote.disconnect() + raise Exception("Command failed: %s" % str(res)) + + self.remote.disconnect() + + def test_node_deploy(self): + try: + self.get_slave_id() + except : + pass + timer = time.time() + timeout = 600 + while True: + node = self.get_slave_node(self.get_slave_id()) + if node is not None: + logging.info("Node found") + self.slave_host = node["ip"] + break + else: + logging.info("Node not found") + if (time.time() - timer) > timeout: + raise Exception("Slave node agent failed to execute!") + time.sleep(15) + logging.info("Waiting for slave agent to run...") + + try: + cluster = json.loads(self.client.get( + "http://%s:8000/api/clusters/1" % self.admin_host + )) + except ValueError: + logging.info("No clusters found - creating test cluster...") + cluster = self.client.post( + "http://%s:8000/api/clusters" % self.admin_host, + data='{ "name": "MyOwnPrivateCluster", "release": %s }' % \ + self.release_id, log=True + ) + cluster = json.loads(cluster) + + resp = json.loads(self.client.put( + "http://%s:8000/api/clusters/1" % self.admin_host, + data='{ "nodes": ["%s"] }' % self.slave_id + )) + + cluster = json.loads(self.client.get( + "http://%s:8000/api/clusters/1" % self.admin_host + )) + if not len(cluster["nodes"]): + raise ValueError("Failed to add node into cluster") + + roles_uploaded = json.loads(self.client.get( + "http://%s:8000/api/roles?release_id=%s" % \ + (self.admin_host, self.release_id) + )) + + """ + FIXME + WILL BE CHANGED WHEN RENDERING WILL BE REWRITTEN + """ + roles_ids = [ + role["id"] for role in roles_uploaded + ] + + resp = json.loads(self.client.put( + "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id), + data='{ "new_roles": %s, "redeployment_needed": true }' % str(roles_ids) + )) + if not len(resp["new_roles"]): + raise ValueError("Failed to assign roles to node") + + if node["status"] == "discover": + logging.info("Node booted with bootstrap image.") + elif node["status"] == "ready": + logging.info("Node already installed.") + self._slave_delete_test_file() + + logging.info("Provisioning...") + task = json.loads(self.client.put( + "http://%s:8000/api/clusters/1/changes/" % self.admin_host, + log=True + )) + task_id = task['task_id'] + logging.info("Task created: %s" % task_id) + logging.info("Waiting for completion of slave node software installation") + timer = time.time() + timeout = 1800 + while True: + try: + task = self.client.get( + "http://%s:8000/api/tasks/%s/" % (self.admin_host, task_id) + ) + logging.info(str(task)) + task = json.loads(task) + if not task['ready']: + raise StillPendingException("Task %s is still pending") + if task.get('error'): + raise Exception( + "Task %s failed!\n %s" % + (task['task_id'], str(task)), + ) + break + except StillPendingException: + if (time.time() - timer) > timeout: + raise Exception("Task pending timeout!") + time.sleep(30) + + node = json.loads(self.client.get( + "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id) + )) + self.slave_host = node["ip"] + + logging.info("Waiting for SSH access on %s" % self.slave_host) + wait(lambda: tcp_ping(self.slave_host, 22), timeout=1800) + self.remote.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd) + + # check if recipes executed + ret = self.remote.execute("test -f /tmp/chef_success") + if ret['exit_status']: + raise Exception("Recipes failed to execute!") + + # check mysql running + #db = MySQLdb.connect(passwd="test", user="root", host=self.slave_host) + #print db + + # check recipes execution order + ret = self.remote.execute("cat /tmp/chef_success") + if [out.strip() for out in ret['stdout']] != ['monitor', 'default', 'compute']: + raise Exception("Recipes executed in a wrong order: %s!" \ + % str(ret['stdout'])) + + # chech node status + node = json.loads(self.client.get( + "http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id) + )) + self.assertEqual(node["status"], "ready") + self.remote.disconnect() + + def _slave_delete_test_file(self): + logging.info("Deleting test file...") + slave_client = SSHClient() + slave_client.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd) + res = slave_client.execute("rm -rf /tmp/chef_success") + slave_client.disconnect() + +# create node with predefined mac address + def get_slave_id(self): + if hasattr(self,"slave_id"): return self.slave_id + if ci is not None: + slave = ci.environment.node['slave'] + slave_id = self.get_id_by_mac(slave.interfaces[0].mac_address) + logging.info("Starting slave node") + slave.start() + logging.info("Nailgun IP: %s" % self.admin_host) + else: + response = self.client.get( + "http://%s:8000/api/nodes" % self.admin_host + ) + last_node = json.loads(response)[-1] + slave_id = self.get_id_by_mac(last_node['mac']) + self.slave_id = slave_id + + return slave_id + + def get_slave_node(self, slave_id): + response = self.client.get( + "http://%s:8000/api/nodes/%s" % (self.admin_host, slave_id) + ) + + if response.startswith("404"): + return None + return json.loads(response) diff --git a/test/integration_test.py b/test/integration_test.py index 024dfed96..5f700e5c7 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1,82 +1,82 @@ -import os.path -import sys -import logging -import argparse -from nose.plugins.manager import PluginManager -from nose.plugins.xunit import Xunit -from root import root - - -sys.path[:0] = [ - root('devops'), -] - -import cookbooks -import integration - -def main(): - parser = argparse.ArgumentParser(description="Integration test suite") - parser.add_argument("-i", "--iso", dest="iso", - help="iso image path or http://url") - parser.add_argument("-l", "--level", dest="log_level", type=str, - help="log level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], - default="ERROR", metavar="LEVEL") - parser.add_argument('--cache-file', dest='cache_file', type=str, - help='file to store integration environment name') - parser.add_argument('--installation-timeout', dest='installation_timeout', type=int, - help='admin node installation timeout') - parser.add_argument('--chef-timeout', dest='chef_timeout', type=int, - help='admin node chef timeout') - parser.add_argument('--suite', dest='test_suite', type=str, - help='Test suite to run', choices=["integration", "cookbooks"], - default="integration") - parser.add_argument('command', choices=('setup', 'destroy', 'test'), default='test', - help="command to execute") - parser.add_argument('arguments', nargs=argparse.REMAINDER, help='arguments for nose testing framework') - - params = parser.parse_args() - - numeric_level = getattr(logging, params.log_level.upper()) - logging.basicConfig(level=numeric_level) - paramiko_logger = logging.getLogger('paramiko') - paramiko_logger.setLevel(numeric_level+1) - - if params.test_suite == 'integration': - suite = integration - elif params.test_suite == 'cookbooks': - suite = cookbooks - - suite.ci = suite.Ci(params.cache_file, params.iso) - suite.ci.installation_timeout = getattr(params, 'installation_timeout', 1800) - suite.ci.chef_timeout = getattr(params, 'chef_timeout', 600) - - if params.command == 'setup': - result = suite.ci.setup_environment() - elif params.command == 'destroy': - result = suite.ci.destroy_environment() - elif params.command == 'test': - import nose - import nose.config - - nc = nose.config.Config() - nc.verbosity = 3 - nc.plugins = PluginManager(plugins=[Xunit()]) - # Set folder where to process tests - nc.configureWhere(os.path.join(os.path.dirname(os.path.abspath(__file__)), params.test_suite)) - nose.main(module=suite, config=nc, argv=[ - __file__, - "--with-xunit", - "--xunit-file=nosetests.xml" - ]+params.arguments) - result = True - else: - print("Unknown command '%s'" % params.command) - sys.exit(1) - - if not result: - sys.exit(1) - - -if __name__ == "__main__": - main() - +import os.path +import sys +import logging +import argparse +from nose.plugins.manager import PluginManager +from nose.plugins.xunit import Xunit +from root import root + + +sys.path[:0] = [ + root('devops'), +] + +import cookbooks +import integration + +def main(): + parser = argparse.ArgumentParser(description="Integration test suite") + parser.add_argument("-i", "--iso", dest="iso", + help="iso image path or http://url") + parser.add_argument("-l", "--level", dest="log_level", type=str, + help="log level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], + default="ERROR", metavar="LEVEL") + parser.add_argument('--cache-file', dest='cache_file', type=str, + help='file to store integration environment name') + parser.add_argument('--installation-timeout', dest='installation_timeout', type=int, + help='admin node installation timeout') + parser.add_argument('--chef-timeout', dest='chef_timeout', type=int, + help='admin node chef timeout') + parser.add_argument('--suite', dest='test_suite', type=str, + help='Test suite to run', choices=["integration", "cookbooks"], + default="integration") + parser.add_argument('command', choices=('setup', 'destroy', 'test'), default='test', + help="command to execute") + parser.add_argument('arguments', nargs=argparse.REMAINDER, help='arguments for nose testing framework') + + params = parser.parse_args() + + numeric_level = getattr(logging, params.log_level.upper()) + logging.basicConfig(level=numeric_level) + paramiko_logger = logging.getLogger('paramiko') + paramiko_logger.setLevel(numeric_level+1) + + if params.test_suite == 'integration': + suite = integration + elif params.test_suite == 'cookbooks': + suite = cookbooks + + suite.ci = suite.Ci(params.cache_file, params.iso) + suite.ci.installation_timeout = getattr(params, 'installation_timeout', 1800) + suite.ci.chef_timeout = getattr(params, 'chef_timeout', 600) + + if params.command == 'setup': + result = suite.ci.setup_environment() + elif params.command == 'destroy': + result = suite.ci.destroy_environment() + elif params.command == 'test': + import nose + import nose.config + + nc = nose.config.Config() + nc.verbosity = 3 + nc.plugins = PluginManager(plugins=[Xunit()]) + # Set folder where to process tests + nc.configureWhere(os.path.join(os.path.dirname(os.path.abspath(__file__)), params.test_suite)) + nose.main(module=suite, config=nc, argv=[ + __file__, + "--with-xunit", + "--xunit-file=nosetests.xml" + ]+params.arguments) + result = True + else: + print("Unknown command '%s'" % params.command) + sys.exit(1) + + if not result: + sys.exit(1) + + +if __name__ == "__main__": + main() +