Devops - fixed paths and dependencies

This commit is contained in:
vic 2012-08-28 15:34:49 +04:00 committed by BeachHead Jenkins CI
parent b8e96ecbe5
commit 60b110a75e
15 changed files with 1997 additions and 1997 deletions

View File

@ -1,2 +1,2 @@
from .main import *
from .main import *

View File

@ -1,31 +1,31 @@
from main import *
def saved(args):
c = getController()
for saved_env in c.saved_environments:
print(saved_env)
def resume(args):
parser = argparse.ArgumentParser(prog='devops resume')
parser.add_argument('environment')
arguments = parser.parse_args(args)
env = load(arguments.environment)
import code
code.InteractiveConsole(locals={'environment': env}).interact()
import sys
import argparse
parser = argparse.ArgumentParser(prog='devops')
parser.add_argument('command', choices=['saved', 'resume'])
parser.add_argument('command_args', nargs=argparse.REMAINDER)
arguments = parser.parse_args()
if arguments.command == 'saved':
saved(arguments.command_args)
elif arguments.command == 'resume':
resume(arguments.command_args)
else:
help()
sys.exit(1)
from main import *
def saved(args):
c = getController()
for saved_env in c.saved_environments:
print(saved_env)
def resume(args):
parser = argparse.ArgumentParser(prog='devops resume')
parser.add_argument('environment')
arguments = parser.parse_args(args)
env = load(arguments.environment)
import code
code.InteractiveConsole(locals={'environment': env}).interact()
import sys
import argparse
parser = argparse.ArgumentParser(prog='devops')
parser.add_argument('command', choices=['saved', 'resume'])
parser.add_argument('command_args', nargs=argparse.REMAINDER)
arguments = parser.parse_args()
if arguments.command == 'saved':
saved(arguments.command_args)
elif arguments.command == 'resume':
resume(arguments.command_args)
else:
help()
sys.exit(1)

View File

@ -1,296 +1,296 @@
import os
import sys
import stat
import tempfile
import shutil
import urllib
import ipaddr
import glob
import random
import string
import re
import time
from model import Node, Network
from network import IpNetworksPool
from error import DevopsError
import my_yaml
import logging
logger = logging.getLogger('devops.controller')
def randstr(length=8):
return ''.join(random.choice(string.ascii_letters) for i in xrange(length))
class Controller:
def __init__(self, driver):
self.driver = driver
self.networks_pool = IpNetworksPool()
self._reserve_networks()
self.home_dir = os.environ.get('DEVOPS_HOME') or os.path.join(os.environ['HOME'], ".devops")
try:
os.makedirs(os.path.join(self.home_dir, 'environments'), 0755)
except OSError:
sys.exc_clear()
def build_environment(self, environment):
logger.info("Building environment %s" % environment.name)
env_id = getattr(environment, 'id', '-'.join([environment.name, randstr()]))
environment.id = env_id
logger.debug("Creating environment working directory for %s environment" % environment.name)
environment.work_dir = os.path.join(self.home_dir, 'environments', environment.id)
os.mkdir(environment.work_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
logger.debug("Environment working directory has been created: %s" % environment.work_dir)
environment.driver = self.driver
for node in environment.nodes:
for interface in node.interfaces:
interface.node = node
interface.network.interfaces.append(interface)
for disk in node.disks:
if disk.base_image and disk.base_image.find('://') != -1:
disk.base_image = self._cache_file(disk.base_image)
if node.cdrom:
if node.cdrom.isopath.find('://') != -1:
node.cdrom.isopath = self._cache_file(node.cdrom.isopath)
for network in environment.networks:
logger.info("Building network %s" % network.name)
network.ip_addresses = self.networks_pool.get()
if network.pxe:
network.dhcp_server = True
tftp_path = os.path.join(environment.work_dir, "tftp")
if not os.path.exists(tftp_path):
os.mkdir(tftp_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
network.tftp_root_dir = tftp_path
if network.dhcp_server:
allocated_addresses = []
for interface in network.interfaces:
for address in interface.ip_addresses:
if address in network.ip_addresses:
allocated_addresses.append(address)
next_address_index = 2
for interface in network.interfaces:
if len(interface.ip_addresses) == 0:
while next_address_index < network.ip_addresses.numhosts and \
network.ip_addresses[next_address_index] in allocated_addresses:
next_address_index += 1
if next_address_index >= network.ip_addresses.numhosts:
raise DevopsError, "Failed to allocated IP address for node '%s' in network '%s': no more addresses left" % (node.name, network.name)
address = network.ip_addresses[next_address_index]
interface.ip_addresses.append(address)
allocated_addresses.append(next_address_index)
next_address_index += 1
network.dhcp_dynamic_address_start = network.ip_addresses[next_address_index]
network.dhcp_dynamic_address_end = network.ip_addresses[network.ip_addresses.numhosts-2]
for network in environment.networks:
logger.info("Building network %s" % network.name)
self.driver.create_network(network)
network.driver = self.driver
for node in environment.nodes:
logger.info("Building node %s" % node.name)
self._build_node(environment, node)
node.driver = self.driver
for network in environment.networks:
self.driver.create_network(network)
network.start()
environment.built = True
logger.info("Finished building environment %s" % environment.name)
def destroy_environment(self, environment):
logger.info("Destroying environment %s" % environment.name)
for node in environment.nodes:
logger.info("Destroying node %s" % node.name)
node.stop()
for snapshot in node.snapshots:
self.driver.delete_snapshot(node, snapshot)
self.driver.delete_node(node)
del node.driver
for network in environment.networks:
logger.info("Destroying network %s" % network.name)
network.stop()
self.driver.delete_network(network)
del network.driver
# FIXME
try:
self.networks_pool.put(network.ip_addresses)
except:
pass
del environment.driver
logger.info("Removing environment %s files" % environment.name)
shutil.rmtree(environment.work_dir)
logger.info("Finished destroying environment %s" % environment.name)
def load_environment(self, environment_id):
env_work_dir = os.path.join(self.home_dir, 'environments', environment_id)
env_config_file = os.path.join(env_work_dir, 'config')
if not os.path.exists(env_config_file):
raise DevopsError, "Environment '%s' couldn't be found" % environment_id
with file(env_config_file) as f:
data = f.read()
environment = my_yaml.load(data)
return environment
def save_environment(self, environment):
data = my_yaml.dump(environment)
if not environment.built:
raise DevopsError, "Environment has not been built yet."
with file(os.path.join(environment.work_dir, 'config'), 'w') as f:
f.write(data)
@property
def saved_environments(self):
saved_environments = []
for path in glob.glob(os.path.join(self.home_dir, 'environments', '*')):
if os.path.exists(os.path.join(path, 'config')):
saved_environments.append(os.path.basename(path))
return saved_environments
def _reserve_networks(self):
logger.debug("Scanning for ip networks that are already taken")
with os.popen("ip route") as f:
for line in f:
words = line.split()
if len(words) == 0:
continue
if words[0] == 'default':
continue
address = ipaddr.IPv4Network(words[0])
logger.debug("Reserving ip network %s" % address)
self.networks_pool.reserve(address)
logger.debug("Finished scanning for taken ip networks")
def _build_node(self, environment, node):
for disk in filter(lambda d: d.path is None, node.disks):
logger.debug("Creating disk file for node '%s'" % node.name)
fd, disk.path = tempfile.mkstemp(
prefix=environment.work_dir + '/disk',
suffix='.' + disk.format
)
os.close(fd)
os.chmod(disk.path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
self.driver.create_disk(disk)
logger.debug("Creating node '%s'" % node.name)
self.driver.create_node(node)
def _cache_file(self, url):
cache_dir = os.path.join(self.home_dir, 'cache')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, 0755)
cache_log_path = os.path.join(cache_dir, 'entries')
if os.path.exists(cache_log_path):
with file(cache_log_path) as f:
cache_entries = my_yaml.load(f.read())
else:
cache_entries = dict()
ext_cache_log_path = os.path.join(cache_dir, 'extended_entries')
if os.path.exists(ext_cache_log_path):
with file(ext_cache_log_path) as f:
extended_cache_entries = my_yaml.load(f.read())
else:
extended_cache_entries = dict()
for key, value in cache_entries.items():
if not extended_cache_entries.has_key(key):
extended_cache_entries[key] = {'cached-path':value}
RFC1123_DATETIME_FORMAT = '%a, %d %b %Y %H:%M:%S %Z'
url_attrs = {}
cached_path = ''
local_mtime = 0
if extended_cache_entries.has_key(url):
url_attrs = extended_cache_entries[url]
cached_path = url_attrs['cached-path']
if url_attrs.has_key('last-modified'):
local_mtime = time.mktime(
time.strptime(url_attrs['last-modified'], RFC1123_DATETIME_FORMAT))
else:
logger.debug("Cache miss for '%s', downloading" % url)
remote = urllib.urlopen(url)
msg = remote.info()
if msg.has_key('last-modified'):
url_mtime = time.mktime(
time.strptime(msg['last-modified'], RFC1123_DATETIME_FORMAT))
else:
url_mtime = 0
if local_mtime >= url_mtime:
logger.debug("Cache is up to date for '%s': '%s'" % (
url, cached_path))
return cached_path
elif cached_path != '':
logger.debug("Cache is old for '%s', downloading" % url)
if not os.access(cached_path, os.W_OK):
try:
os.unlink(cached_path)
except Exception:
pass
fd, cached_path = tempfile.mkstemp(prefix=cache_dir+'/')
os.close(fd)
with file(cached_path, 'w') as f:
shutil.copyfileobj(remote, f)
os.chmod(cached_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if msg.has_key('last-modified'):
url_attrs['last-modified'] = msg['last-modified']
url_attrs['cached-path'] = cached_path
extended_cache_entries[url] = url_attrs
with file(ext_cache_log_path, 'w') as f:
f.write(my_yaml.dump(extended_cache_entries))
cache_entries[url] = cached_path
with file(cache_log_path, 'w') as f:
f.write(my_yaml.dump(cache_entries))
logger.debug("Cached '%s' to '%s'" % (url, cached_path))
return cached_path
import os
import sys
import stat
import tempfile
import shutil
import urllib
import ipaddr
import glob
import random
import string
import re
import time
from model import Node, Network
from network import IpNetworksPool
from error import DevopsError
import my_yaml
import logging
logger = logging.getLogger('devops.controller')
def randstr(length=8):
return ''.join(random.choice(string.ascii_letters) for i in xrange(length))
class Controller:
def __init__(self, driver):
self.driver = driver
self.networks_pool = IpNetworksPool()
self._reserve_networks()
self.home_dir = os.environ.get('DEVOPS_HOME') or os.path.join(os.environ['HOME'], ".devops")
try:
os.makedirs(os.path.join(self.home_dir, 'environments'), 0755)
except OSError:
sys.exc_clear()
def build_environment(self, environment):
logger.info("Building environment %s" % environment.name)
env_id = getattr(environment, 'id', '-'.join([environment.name, randstr()]))
environment.id = env_id
logger.debug("Creating environment working directory for %s environment" % environment.name)
environment.work_dir = os.path.join(self.home_dir, 'environments', environment.id)
os.mkdir(environment.work_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
logger.debug("Environment working directory has been created: %s" % environment.work_dir)
environment.driver = self.driver
for node in environment.nodes:
for interface in node.interfaces:
interface.node = node
interface.network.interfaces.append(interface)
for disk in node.disks:
if disk.base_image and disk.base_image.find('://') != -1:
disk.base_image = self._cache_file(disk.base_image)
if node.cdrom:
if node.cdrom.isopath.find('://') != -1:
node.cdrom.isopath = self._cache_file(node.cdrom.isopath)
for network in environment.networks:
logger.info("Building network %s" % network.name)
network.ip_addresses = self.networks_pool.get()
if network.pxe:
network.dhcp_server = True
tftp_path = os.path.join(environment.work_dir, "tftp")
if not os.path.exists(tftp_path):
os.mkdir(tftp_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
network.tftp_root_dir = tftp_path
if network.dhcp_server:
allocated_addresses = []
for interface in network.interfaces:
for address in interface.ip_addresses:
if address in network.ip_addresses:
allocated_addresses.append(address)
next_address_index = 2
for interface in network.interfaces:
if len(interface.ip_addresses) == 0:
while next_address_index < network.ip_addresses.numhosts and \
network.ip_addresses[next_address_index] in allocated_addresses:
next_address_index += 1
if next_address_index >= network.ip_addresses.numhosts:
raise DevopsError, "Failed to allocated IP address for node '%s' in network '%s': no more addresses left" % (node.name, network.name)
address = network.ip_addresses[next_address_index]
interface.ip_addresses.append(address)
allocated_addresses.append(next_address_index)
next_address_index += 1
network.dhcp_dynamic_address_start = network.ip_addresses[next_address_index]
network.dhcp_dynamic_address_end = network.ip_addresses[network.ip_addresses.numhosts-2]
for network in environment.networks:
logger.info("Building network %s" % network.name)
self.driver.create_network(network)
network.driver = self.driver
for node in environment.nodes:
logger.info("Building node %s" % node.name)
self._build_node(environment, node)
node.driver = self.driver
for network in environment.networks:
self.driver.create_network(network)
network.start()
environment.built = True
logger.info("Finished building environment %s" % environment.name)
def destroy_environment(self, environment):
logger.info("Destroying environment %s" % environment.name)
for node in environment.nodes:
logger.info("Destroying node %s" % node.name)
node.stop()
for snapshot in node.snapshots:
self.driver.delete_snapshot(node, snapshot)
self.driver.delete_node(node)
del node.driver
for network in environment.networks:
logger.info("Destroying network %s" % network.name)
network.stop()
self.driver.delete_network(network)
del network.driver
# FIXME
try:
self.networks_pool.put(network.ip_addresses)
except:
pass
del environment.driver
logger.info("Removing environment %s files" % environment.name)
shutil.rmtree(environment.work_dir)
logger.info("Finished destroying environment %s" % environment.name)
def load_environment(self, environment_id):
env_work_dir = os.path.join(self.home_dir, 'environments', environment_id)
env_config_file = os.path.join(env_work_dir, 'config')
if not os.path.exists(env_config_file):
raise DevopsError, "Environment '%s' couldn't be found" % environment_id
with file(env_config_file) as f:
data = f.read()
environment = my_yaml.load(data)
return environment
def save_environment(self, environment):
data = my_yaml.dump(environment)
if not environment.built:
raise DevopsError, "Environment has not been built yet."
with file(os.path.join(environment.work_dir, 'config'), 'w') as f:
f.write(data)
@property
def saved_environments(self):
saved_environments = []
for path in glob.glob(os.path.join(self.home_dir, 'environments', '*')):
if os.path.exists(os.path.join(path, 'config')):
saved_environments.append(os.path.basename(path))
return saved_environments
def _reserve_networks(self):
logger.debug("Scanning for ip networks that are already taken")
with os.popen("ip route") as f:
for line in f:
words = line.split()
if len(words) == 0:
continue
if words[0] == 'default':
continue
address = ipaddr.IPv4Network(words[0])
logger.debug("Reserving ip network %s" % address)
self.networks_pool.reserve(address)
logger.debug("Finished scanning for taken ip networks")
def _build_node(self, environment, node):
for disk in filter(lambda d: d.path is None, node.disks):
logger.debug("Creating disk file for node '%s'" % node.name)
fd, disk.path = tempfile.mkstemp(
prefix=environment.work_dir + '/disk',
suffix='.' + disk.format
)
os.close(fd)
os.chmod(disk.path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
self.driver.create_disk(disk)
logger.debug("Creating node '%s'" % node.name)
self.driver.create_node(node)
def _cache_file(self, url):
cache_dir = os.path.join(self.home_dir, 'cache')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, 0755)
cache_log_path = os.path.join(cache_dir, 'entries')
if os.path.exists(cache_log_path):
with file(cache_log_path) as f:
cache_entries = my_yaml.load(f.read())
else:
cache_entries = dict()
ext_cache_log_path = os.path.join(cache_dir, 'extended_entries')
if os.path.exists(ext_cache_log_path):
with file(ext_cache_log_path) as f:
extended_cache_entries = my_yaml.load(f.read())
else:
extended_cache_entries = dict()
for key, value in cache_entries.items():
if not extended_cache_entries.has_key(key):
extended_cache_entries[key] = {'cached-path':value}
RFC1123_DATETIME_FORMAT = '%a, %d %b %Y %H:%M:%S %Z'
url_attrs = {}
cached_path = ''
local_mtime = 0
if extended_cache_entries.has_key(url):
url_attrs = extended_cache_entries[url]
cached_path = url_attrs['cached-path']
if url_attrs.has_key('last-modified'):
local_mtime = time.mktime(
time.strptime(url_attrs['last-modified'], RFC1123_DATETIME_FORMAT))
else:
logger.debug("Cache miss for '%s', downloading" % url)
remote = urllib.urlopen(url)
msg = remote.info()
if msg.has_key('last-modified'):
url_mtime = time.mktime(
time.strptime(msg['last-modified'], RFC1123_DATETIME_FORMAT))
else:
url_mtime = 0
if local_mtime >= url_mtime:
logger.debug("Cache is up to date for '%s': '%s'" % (
url, cached_path))
return cached_path
elif cached_path != '':
logger.debug("Cache is old for '%s', downloading" % url)
if not os.access(cached_path, os.W_OK):
try:
os.unlink(cached_path)
except Exception:
pass
fd, cached_path = tempfile.mkstemp(prefix=cache_dir+'/')
os.close(fd)
with file(cached_path, 'w') as f:
shutil.copyfileobj(remote, f)
os.chmod(cached_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if msg.has_key('last-modified'):
url_attrs['last-modified'] = msg['last-modified']
url_attrs['cached-path'] = cached_path
extended_cache_entries[url] = url_attrs
with file(ext_cache_log_path, 'w') as f:
f.write(my_yaml.dump(extended_cache_entries))
cache_entries[url] = cached_path
with file(cache_log_path, 'w') as f:
f.write(my_yaml.dump(cache_entries))
logger.debug("Cached '%s' to '%s'" % (url, cached_path))
return cached_path

View File

@ -1,402 +1,402 @@
# vim: ts=4 sw=4 expandtab
import xml
import os
import tempfile
import time
import subprocess, shlex
from collections import deque
from xmlbuilder import XMLBuilder
import ipaddr
import re
import logging
logger = logging.getLogger('devops.libvirt')
def index(p, seq):
for i in xrange(len(seq)):
if p(seq[i]): return i
return -1
def find(p, seq):
for item in seq:
if p(item): return item
return None
def spec_priority(spec):
if spec.hypervisor == 'qemu':
return 0.5
return 1.0
class DeploymentSpec:
def __repr__(self):
return "<DeploymentSpec arch=\"%s\" os_type=\"%s\" hypervisor=\"%s\" emulator=\"%s\">" % (self.arch, self.os_type, self.hypervisor, self.emulator)
class LibvirtException(Exception): pass
class LibvirtXMLBuilder:
def build_network_xml(self, network):
network_xml = XMLBuilder('network')
network_xml.name(network.id)
network_xml.forward(mode='nat')
if hasattr(network, 'ip_addresses') and not network.ip_addresses is None:
with network_xml.ip(address=str(network.ip_addresses[1]), prefix=str(network.ip_addresses.prefixlen)):
if network.pxe:
network_xml.tftp(root=network.tftp_root_dir)
if network.dhcp_server:
with network_xml.dhcp:
if hasattr(network, 'dhcp_dynamic_address_start'):
start = network.dhcp_dynamic_address_start
else:
start = network.ip_addresses[2]
if hasattr(network, 'dhcp_dynamic_address_end'):
end = network.dhcp_dynamic_address_end
else:
end = network.ip_addresses[network.ip_addresses.numhosts-2]
network_xml.range(start=str(start), end=str(end))
for interface in network.interfaces:
address = find(lambda ip: ip in network.ip_addresses, interface.ip_addresses)
if address and interface.mac_address:
network_xml.host(mac=str(interface.mac_address), ip=str(address), name=interface.node.name)
if network.pxe:
network_xml.bootp(file="pxelinux.0")
return str(network_xml)
def build_node_xml(self, node, spec):
node_xml = XMLBuilder("domain", type=spec.hypervisor)
node_xml.name(node.id)
node_xml.vcpu(str(node.cpu))
node_xml.memory(str(node.memory*1024), unit='KiB')
with node_xml.os:
node_xml.type(spec.os_type, arch=node.arch)
for boot_dev in node.boot:
if boot_dev == 'disk':
node_xml.boot(dev="hd")
else:
node_xml.boot(dev=boot_dev)
ide_disk_names = deque(['hd'+c for c in list('abcdefghijklmnopqrstuvwxyz')])
serial_disk_names = deque(['sd'+c for c in list('abcdefghijklmnopqrstuvwxyz')])
def disk_name(bus='ide'):
if str(bus) == 'ide':
return ide_disk_names.popleft()
return serial_disk_names.popleft()
with node_xml.devices:
node_xml.emulator(spec.emulator)
if len(node.disks) > 0:
node_xml.controller(type="ide")
for disk in node.disks:
with node_xml.disk(type="file", device="disk"):
node_xml.driver(name="qemu", type=disk.format, cache="unsafe")
node_xml.source(file=disk.path)
node_xml.target(dev=disk_name(disk.bus), bus=disk.bus)
if node.cdrom:
with node_xml.disk(type="file", device="cdrom"):
node_xml.driver(name="qemu", type="raw")
node_xml.source(file=node.cdrom.isopath)
node_xml.target(dev=disk_name(node.cdrom.bus), bus=node.cdrom.bus)
for interface in node.interfaces:
with node_xml.interface(type="network"):
node_xml.source(network=interface.network.id)
if node.vnc:
node_xml.graphics(type='vnc', listen='0.0.0.0', autoport='yes')
return str(node_xml)
class Libvirt:
def __init__(self, xml_builder = LibvirtXMLBuilder()):
self.xml_builder = xml_builder
self._init_capabilities()
def node_exists(self, node_name):
return self._system("virsh dominfo '%s'" % node_name, expected_resultcodes=(0, 1)) == 0
def network_exists(self, network_name):
return self._system("virsh net-info '%s' 2>/dev/null" % network_name, expected_resultcodes=(0, 1)) == 0
def create_network(self, network):
if not hasattr(network, 'id') or network.id is None:
network.id = self._generate_network_id(network.name)
elif self.is_network_defined(network):
self._virsh("net-undefine '%s'", network.id)
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
network_xml = self.xml_builder.build_network_xml(network)
logger.debug("libvirt: Building network with following XML:\n%s" % network_xml)
xml_file.write(network_xml)
xml_file.flush()
self._virsh("net-define '%s'", xml_file.name)
with os.popen("virsh net-dumpxml '%s'" % network.id) as f:
network_element = xml.parse_stream(f)
network.bridge_name = network_element.find('bridge/@name')
network.mac_address = network_element.find('mac/@address')
def delete_network(self, network):
if self.is_network_defined(network):
logger.debug("Network %s is defined. Undefining.")
self._virsh("net-undefine '%s'", network.id)
def start_network(self, network):
if not self.is_network_running(network):
logger.debug("Network %s is not running. Starting.")
self._virsh("net-start '%s'", network.id)
def stop_network(self, network):
if self.is_network_running(network):
logger.debug("Network %s is running. Stopping.")
self._virsh("net-destroy '%s'", network.id)
def _get_node_xml(self, node):
with os.popen("virsh dumpxml '%s'" % node.id) as f:
return xml.parse_stream(f)
def create_node(self, node):
specs = filter(lambda spec: spec.arch == node.arch, self.specs)
if len(specs) == 0:
raise LibvirtException, "Can't create node %s: insufficient capabilities" % node.name
specs.sort(key=spec_priority)
spec = specs[-1]
if not hasattr(node, 'id') or node.id is None:
node.id = self._generate_node_id(node.name)
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
node_xml = self.xml_builder.build_node_xml(node, spec)
logger.debug("libvirt: Building node with following XML:\n%s" % node_xml)
xml_file.write(node_xml)
xml_file.flush()
self._virsh("define '%s'", xml_file.name)
domain = self._get_node_xml(node)
for interface_element in domain.find_all('devices/interface[@type="network"]'):
network_id = interface_element.find('source/@network')
interface = find(lambda i: i.network.id == network_id, node.interfaces)
if interface is None:
continue
interface.mac_address = interface_element.find('mac/@address')
def delete_node(self, node):
if self.is_node_defined(node):
logger.debug("Node %s defined. Undefining." % node.id)
self._virsh("undefine '%s'", node.id)
def start_node(self, node):
if not self.is_node_running(node):
logger.debug("Node %s is not running at the moment. Starting." % node.id)
self._virsh("start '%s'", node.id)
if node.vnc:
domain = self._get_node_xml(node)
port_text = domain.find('devices/graphics[@type="vnc"]/@port')
if port_text: node.vnc_port = int(port_text)
def stop_node(self, node):
if self.is_node_running(node):
logger.debug("Node %s is running at the moment. Stopping." % node.id)
self._virsh("destroy '%s'", node.id)
def reset_node(self, node):
self._virsh("reset '%s'", node.id)
def reboot_node(self, node):
self._virsh("reboot '%s'", node.id)
def shutdown_node(self, node):
self._virsh("stop '%s'", node.id)
def get_node_snapshots(self, node):
command = "virsh snapshot-list '%s'" % node.id
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
if process.returncode != 0:
logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, process.stderr.read()))
else:
logger.debug("Command '%s' returned %d" % (command, process.returncode))
snapshot_ids = []
for line in process.stdout.readlines()[2:]:
if line.strip() == '': continue
snapshot_ids.append(line.split()[0])
return snapshot_ids
def create_snapshot(self, node, name=None, description=None):
if not name:
name = str(int(time.time()*100))
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
snapshot_xml = XMLBuilder('domainsnapshot')
snapshot_xml.name(name)
if description:
snapshot_xml.description(description)
logger.debug("Building snapshot with following XML:\n%s" % str(snapshot_xml))
xml_file.write(str(snapshot_xml))
xml_file.flush()
self._virsh("snapshot-create '%s' '%s'", node.id, xml_file.name)
return name
def revert_snapshot(self, node, snapshot_name=None):
if not snapshot_name:
snapshot_name = '--current'
self._virsh("snapshot-revert '%s' %s", node.id, snapshot_name)
def delete_snapshot(self, node, snapshot_name=None):
if not snapshot_name:
snapshot_name = '--current'
self._virsh("snapshot-delete '%s' %s", node.id, snapshot_name)
def send_keys_to_node(self, node, keys):
keys = scancodes.from_string(str(keys))
for key_codes in keys:
if isinstance(key_codes[0], str):
if key_codes[0] == 'wait':
time.sleep(1)
continue
self._virsh("send-key '%s' %s", node.id, ' '.join(map(lambda x: str(x), key_codes)))
def create_disk(self, disk):
if not disk.path:
f, disk.path = tempfile.mkstemp(prefix='disk-', suffix=(".%s" % disk.format))
os.close(f)
if disk.base_image:
self._system("qemu-img create -f '%(format)s' -b '%(backing_path)s' '%(path)s'" % {'format': disk.format, 'path': disk.path, 'backing_path': disk.base_image})
else:
self._system("qemu-img create -f '%(format)s' '%(path)s' '%(size)s'" % {'format': disk.format, 'path': disk.path, 'size': disk.size})
def delete_disk(self, disk):
if disk.path is None: return
os.unlink(disk.path)
def get_interface_addresses(self, interface):
command = "arp -an | awk '$4 == \"%(mac)s\" && $7 == \"%(interface)s\" {print substr($2, 2, length($2)-2)}'" % { 'mac': interface.mac_address, 'interface': interface.network.bridge_name}
with os.popen(command) as f:
return [ipaddr.IPv4Address(s) for s in f.read().split()]
def _virsh(self, format, *args):
command = ("virsh " + format) % args
logger.debug("Running '%s'" % command)
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
if process.returncode != 0:
logger.error("Command '%s' returned code %d:\n%s" % (command, process.returncode, process.stderr.read()))
raise LibvirtException, "Failed to execute command '%s'" % command
def _init_capabilities(self):
with os.popen("virsh capabilities") as f:
capabilities = xml.parse_stream(f)
self.specs = []
for guest in capabilities.find_all('guest'):
for arch in guest.find_all('arch'):
for domain in arch.find_all('domain'):
spec = DeploymentSpec()
spec.arch = arch['name']
spec.os_type = guest.find('os_type/text()')
spec.hypervisor = domain['type']
spec.emulator = (domain.find('emulator') or arch.find('emulator')).text
self.specs.append(spec)
def _generate_network_id(self, name='net'):
while True:
id = name + '-' + str(int(time.time()*100))
if not self.network_exists(id):
return id
def _generate_node_id(self, name='node'):
while True:
id = name + '-' + str(int(time.time()*100))
if not self.node_exists(id):
return id
def is_node_defined(self, node):
return self._system2("virsh list --all | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0
def is_node_running(self, node):
return self._system2("virsh list | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0
def is_network_defined(self, network):
return self._system2("virsh net-list --all | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0
def is_network_running(self, network):
return self._system2("virsh net-list | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0
def _system2(self, command, expected_resultcodes=(0,)):
logger.debug("Running %s" % command)
commands = [ i.strip() for i in re.split(ur'\|', command)]
serr = []
process = []
process.append(subprocess.Popen(shlex.split(commands[0]), stdin=None,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
for c in commands[1:]:
process.append(subprocess.Popen(shlex.split(c), stdin=process[-1].stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
process[-1].wait()
for p in process:
serr += [ err.strip() for err in p.stderr.readlines() ]
returncode = process[-1].returncode
if expected_resultcodes and not returncode in expected_resultcodes:
logger.error("Command '%s' returned %d, stderr: %s" % (command, returncode, '\n'.join(serr)))
else:
logger.debug("Command '%s' returned %d" % (command, returncode))
return returncode
def _system(self, command, expected_resultcodes=(0,)):
logger.debug("Running '%s'" % command)
serr = []
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
serr += [ err.strip() for err in process.stderr.readlines() ]
if expected_resultcodes and not process.returncode in expected_resultcodes:
logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, '\n'.join(serr)))
else:
logger.debug("Command '%s' returned %d" % (command, process.returncode))
return process.returncode
# vim: ts=4 sw=4 expandtab
import xml
import os
import tempfile
import time
import subprocess, shlex
from collections import deque
from xmlbuilder import XMLBuilder
import ipaddr
import re
import logging
logger = logging.getLogger('devops.libvirt')
def index(p, seq):
for i in xrange(len(seq)):
if p(seq[i]): return i
return -1
def find(p, seq):
for item in seq:
if p(item): return item
return None
def spec_priority(spec):
if spec.hypervisor == 'qemu':
return 0.5
return 1.0
class DeploymentSpec:
def __repr__(self):
return "<DeploymentSpec arch=\"%s\" os_type=\"%s\" hypervisor=\"%s\" emulator=\"%s\">" % (self.arch, self.os_type, self.hypervisor, self.emulator)
class LibvirtException(Exception): pass
class LibvirtXMLBuilder:
def build_network_xml(self, network):
network_xml = XMLBuilder('network')
network_xml.name(network.id)
network_xml.forward(mode='nat')
if hasattr(network, 'ip_addresses') and not network.ip_addresses is None:
with network_xml.ip(address=str(network.ip_addresses[1]), prefix=str(network.ip_addresses.prefixlen)):
if network.pxe:
network_xml.tftp(root=network.tftp_root_dir)
if network.dhcp_server:
with network_xml.dhcp:
if hasattr(network, 'dhcp_dynamic_address_start'):
start = network.dhcp_dynamic_address_start
else:
start = network.ip_addresses[2]
if hasattr(network, 'dhcp_dynamic_address_end'):
end = network.dhcp_dynamic_address_end
else:
end = network.ip_addresses[network.ip_addresses.numhosts-2]
network_xml.range(start=str(start), end=str(end))
for interface in network.interfaces:
address = find(lambda ip: ip in network.ip_addresses, interface.ip_addresses)
if address and interface.mac_address:
network_xml.host(mac=str(interface.mac_address), ip=str(address), name=interface.node.name)
if network.pxe:
network_xml.bootp(file="pxelinux.0")
return str(network_xml)
def build_node_xml(self, node, spec):
node_xml = XMLBuilder("domain", type=spec.hypervisor)
node_xml.name(node.id)
node_xml.vcpu(str(node.cpu))
node_xml.memory(str(node.memory*1024), unit='KiB')
with node_xml.os:
node_xml.type(spec.os_type, arch=node.arch)
for boot_dev in node.boot:
if boot_dev == 'disk':
node_xml.boot(dev="hd")
else:
node_xml.boot(dev=boot_dev)
ide_disk_names = deque(['hd'+c for c in list('abcdefghijklmnopqrstuvwxyz')])
serial_disk_names = deque(['sd'+c for c in list('abcdefghijklmnopqrstuvwxyz')])
def disk_name(bus='ide'):
if str(bus) == 'ide':
return ide_disk_names.popleft()
return serial_disk_names.popleft()
with node_xml.devices:
node_xml.emulator(spec.emulator)
if len(node.disks) > 0:
node_xml.controller(type="ide")
for disk in node.disks:
with node_xml.disk(type="file", device="disk"):
node_xml.driver(name="qemu", type=disk.format, cache="unsafe")
node_xml.source(file=disk.path)
node_xml.target(dev=disk_name(disk.bus), bus=disk.bus)
if node.cdrom:
with node_xml.disk(type="file", device="cdrom"):
node_xml.driver(name="qemu", type="raw")
node_xml.source(file=node.cdrom.isopath)
node_xml.target(dev=disk_name(node.cdrom.bus), bus=node.cdrom.bus)
for interface in node.interfaces:
with node_xml.interface(type="network"):
node_xml.source(network=interface.network.id)
if node.vnc:
node_xml.graphics(type='vnc', listen='0.0.0.0', autoport='yes')
return str(node_xml)
class Libvirt:
def __init__(self, xml_builder = LibvirtXMLBuilder()):
self.xml_builder = xml_builder
self._init_capabilities()
def node_exists(self, node_name):
return self._system("virsh dominfo '%s'" % node_name, expected_resultcodes=(0, 1)) == 0
def network_exists(self, network_name):
return self._system("virsh net-info '%s' 2>/dev/null" % network_name, expected_resultcodes=(0, 1)) == 0
def create_network(self, network):
if not hasattr(network, 'id') or network.id is None:
network.id = self._generate_network_id(network.name)
elif self.is_network_defined(network):
self._virsh("net-undefine '%s'", network.id)
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
network_xml = self.xml_builder.build_network_xml(network)
logger.debug("libvirt: Building network with following XML:\n%s" % network_xml)
xml_file.write(network_xml)
xml_file.flush()
self._virsh("net-define '%s'", xml_file.name)
with os.popen("virsh net-dumpxml '%s'" % network.id) as f:
network_element = xml.parse_stream(f)
network.bridge_name = network_element.find('bridge/@name')
network.mac_address = network_element.find('mac/@address')
def delete_network(self, network):
if self.is_network_defined(network):
logger.debug("Network %s is defined. Undefining.")
self._virsh("net-undefine '%s'", network.id)
def start_network(self, network):
if not self.is_network_running(network):
logger.debug("Network %s is not running. Starting.")
self._virsh("net-start '%s'", network.id)
def stop_network(self, network):
if self.is_network_running(network):
logger.debug("Network %s is running. Stopping.")
self._virsh("net-destroy '%s'", network.id)
def _get_node_xml(self, node):
with os.popen("virsh dumpxml '%s'" % node.id) as f:
return xml.parse_stream(f)
def create_node(self, node):
specs = filter(lambda spec: spec.arch == node.arch, self.specs)
if len(specs) == 0:
raise LibvirtException, "Can't create node %s: insufficient capabilities" % node.name
specs.sort(key=spec_priority)
spec = specs[-1]
if not hasattr(node, 'id') or node.id is None:
node.id = self._generate_node_id(node.name)
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
node_xml = self.xml_builder.build_node_xml(node, spec)
logger.debug("libvirt: Building node with following XML:\n%s" % node_xml)
xml_file.write(node_xml)
xml_file.flush()
self._virsh("define '%s'", xml_file.name)
domain = self._get_node_xml(node)
for interface_element in domain.find_all('devices/interface[@type="network"]'):
network_id = interface_element.find('source/@network')
interface = find(lambda i: i.network.id == network_id, node.interfaces)
if interface is None:
continue
interface.mac_address = interface_element.find('mac/@address')
def delete_node(self, node):
if self.is_node_defined(node):
logger.debug("Node %s defined. Undefining." % node.id)
self._virsh("undefine '%s'", node.id)
def start_node(self, node):
if not self.is_node_running(node):
logger.debug("Node %s is not running at the moment. Starting." % node.id)
self._virsh("start '%s'", node.id)
if node.vnc:
domain = self._get_node_xml(node)
port_text = domain.find('devices/graphics[@type="vnc"]/@port')
if port_text: node.vnc_port = int(port_text)
def stop_node(self, node):
if self.is_node_running(node):
logger.debug("Node %s is running at the moment. Stopping." % node.id)
self._virsh("destroy '%s'", node.id)
def reset_node(self, node):
self._virsh("reset '%s'", node.id)
def reboot_node(self, node):
self._virsh("reboot '%s'", node.id)
def shutdown_node(self, node):
self._virsh("stop '%s'", node.id)
def get_node_snapshots(self, node):
command = "virsh snapshot-list '%s'" % node.id
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
if process.returncode != 0:
logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, process.stderr.read()))
else:
logger.debug("Command '%s' returned %d" % (command, process.returncode))
snapshot_ids = []
for line in process.stdout.readlines()[2:]:
if line.strip() == '': continue
snapshot_ids.append(line.split()[0])
return snapshot_ids
def create_snapshot(self, node, name=None, description=None):
if not name:
name = str(int(time.time()*100))
with tempfile.NamedTemporaryFile(delete=True) as xml_file:
snapshot_xml = XMLBuilder('domainsnapshot')
snapshot_xml.name(name)
if description:
snapshot_xml.description(description)
logger.debug("Building snapshot with following XML:\n%s" % str(snapshot_xml))
xml_file.write(str(snapshot_xml))
xml_file.flush()
self._virsh("snapshot-create '%s' '%s'", node.id, xml_file.name)
return name
def revert_snapshot(self, node, snapshot_name=None):
if not snapshot_name:
snapshot_name = '--current'
self._virsh("snapshot-revert '%s' %s", node.id, snapshot_name)
def delete_snapshot(self, node, snapshot_name=None):
if not snapshot_name:
snapshot_name = '--current'
self._virsh("snapshot-delete '%s' %s", node.id, snapshot_name)
def send_keys_to_node(self, node, keys):
keys = scancodes.from_string(str(keys))
for key_codes in keys:
if isinstance(key_codes[0], str):
if key_codes[0] == 'wait':
time.sleep(1)
continue
self._virsh("send-key '%s' %s", node.id, ' '.join(map(lambda x: str(x), key_codes)))
def create_disk(self, disk):
if not disk.path:
f, disk.path = tempfile.mkstemp(prefix='disk-', suffix=(".%s" % disk.format))
os.close(f)
if disk.base_image:
self._system("qemu-img create -f '%(format)s' -b '%(backing_path)s' '%(path)s'" % {'format': disk.format, 'path': disk.path, 'backing_path': disk.base_image})
else:
self._system("qemu-img create -f '%(format)s' '%(path)s' '%(size)s'" % {'format': disk.format, 'path': disk.path, 'size': disk.size})
def delete_disk(self, disk):
if disk.path is None: return
os.unlink(disk.path)
def get_interface_addresses(self, interface):
command = "arp -an | awk '$4 == \"%(mac)s\" && $7 == \"%(interface)s\" {print substr($2, 2, length($2)-2)}'" % { 'mac': interface.mac_address, 'interface': interface.network.bridge_name}
with os.popen(command) as f:
return [ipaddr.IPv4Address(s) for s in f.read().split()]
def _virsh(self, format, *args):
command = ("virsh " + format) % args
logger.debug("Running '%s'" % command)
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
if process.returncode != 0:
logger.error("Command '%s' returned code %d:\n%s" % (command, process.returncode, process.stderr.read()))
raise LibvirtException, "Failed to execute command '%s'" % command
def _init_capabilities(self):
with os.popen("virsh capabilities") as f:
capabilities = xml.parse_stream(f)
self.specs = []
for guest in capabilities.find_all('guest'):
for arch in guest.find_all('arch'):
for domain in arch.find_all('domain'):
spec = DeploymentSpec()
spec.arch = arch['name']
spec.os_type = guest.find('os_type/text()')
spec.hypervisor = domain['type']
spec.emulator = (domain.find('emulator') or arch.find('emulator')).text
self.specs.append(spec)
def _generate_network_id(self, name='net'):
while True:
id = name + '-' + str(int(time.time()*100))
if not self.network_exists(id):
return id
def _generate_node_id(self, name='node'):
while True:
id = name + '-' + str(int(time.time()*100))
if not self.node_exists(id):
return id
def is_node_defined(self, node):
return self._system2("virsh list --all | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0
def is_node_running(self, node):
return self._system2("virsh list | grep -q ' %s '" % node.id, expected_resultcodes=(0, 1)) == 0
def is_network_defined(self, network):
return self._system2("virsh net-list --all | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0
def is_network_running(self, network):
return self._system2("virsh net-list | grep -q '%s '" % network.id, expected_resultcodes=(0, 1)) == 0
def _system2(self, command, expected_resultcodes=(0,)):
logger.debug("Running %s" % command)
commands = [ i.strip() for i in re.split(ur'\|', command)]
serr = []
process = []
process.append(subprocess.Popen(shlex.split(commands[0]), stdin=None,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
for c in commands[1:]:
process.append(subprocess.Popen(shlex.split(c), stdin=process[-1].stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
process[-1].wait()
for p in process:
serr += [ err.strip() for err in p.stderr.readlines() ]
returncode = process[-1].returncode
if expected_resultcodes and not returncode in expected_resultcodes:
logger.error("Command '%s' returned %d, stderr: %s" % (command, returncode, '\n'.join(serr)))
else:
logger.debug("Command '%s' returned %d" % (command, returncode))
return returncode
def _system(self, command, expected_resultcodes=(0,)):
logger.debug("Running '%s'" % command)
serr = []
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
serr += [ err.strip() for err in process.stderr.readlines() ]
if expected_resultcodes and not process.returncode in expected_resultcodes:
logger.error("Command '%s' returned %d, stderr: %s" % (command, process.returncode, '\n'.join(serr)))
else:
logger.debug("Command '%s' returned %d" % (command, process.returncode))
return process.returncode

View File

@ -1,289 +1,289 @@
import os
import os.path
import urllib
import stat
import socket
import time
import httplib
import xmlrpclib
import paramiko
import string
import random
from threading import Thread
import BaseHTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import posixpath
import logging
from devops.error import DevopsError
logger = logging.getLogger('devops.helpers')
class TimeoutError(Exception): pass
class AuthenticationError(Exception): pass
def get_free_port():
ports = range(32000, 32100)
random.shuffle(ports)
for port in ports:
if not tcp_ping('localhost', port):
return port
raise DevopsError, "No free ports available"
def icmp_ping(host, timeout=1):
"icmp_ping(host, timeout=1) - returns True if host is pingable; False - otherwise."
return os.system("ping -c 1 -W '%(timeout)d' '%(host)s' 1>/dev/null 2>&1" % { 'host': str(host), 'timeout': timeout}) == 0
def tcp_ping(host, port):
"tcp_ping(host, port) - returns True if TCP connection to specified host and port can be established; False - otherwise."
s = socket.socket()
try:
s.connect((str(host), int(port)))
except socket.error:
return False
s.close()
return True
def wait(predicate, interval=5, timeout=None):
"""
wait(predicate, interval=5, timeout=None) - wait until predicate will become True. Returns number of seconds that is left or 0 if timeout is None.
Options:
interval - seconds between checks.
timeout - raise TimeoutError if predicate won't become True after this amount of seconds. 'None' disables timeout.
"""
start_time = time.time()
while not predicate():
if timeout and start_time + timeout < time.time():
raise TimeoutError, "Waiting timed out"
seconds_to_sleep = interval
if timeout:
seconds_to_sleep = max(0, min(seconds_to_sleep, start_time + timeout - time.time()))
time.sleep(seconds_to_sleep)
return timeout + start_time - time.time() if timeout else 0
def http(host='localhost', port=80, method='GET', url='/', waited_code=200):
try:
conn = httplib.HTTPConnection(str(host), int(port))
conn.request(method, url)
res = conn.getresponse()
if res.status == waited_code:
return True
return False
except:
return False
class KeyPolicy(paramiko.WarningPolicy):
def missing_host_key(self, client, hostname, key):
return
class SSHClient(object):
class get_sudo(object):
def __init__(self, ssh):
self.ssh = ssh
def __enter__(self):
self.ssh.sudo_mode = True
def __exit__(self, type, value, traceback):
self.ssh.sudo_mode = False
def __init__(self, host, port=22, username=None, password=None):
self.host = str(host)
self.port = int(port)
self.username = username
self.password = password
self.sudo_mode = False
self.sudo = self.get_sudo(self)
self.reconnect()
def __del__(self):
self._sftp.close()
self._ssh.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
pass
def reconnect(self):
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(self.host, port=self.port, username=self.username, password=self.password)
self._sftp = self._ssh.open_sftp()
def execute(self, command):
logger.debug("Executing command: '%s'" % command.rstrip())
chan = self._ssh.get_transport().open_session()
stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
cmd = "%s\n" % command
if self.sudo_mode:
cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
chan.exec_command(cmd)
if stdout.channel.closed is False:
stdin.write('%s\n' % self.password)
stdin.flush()
result = {
'stdout': [],
'stderr': [],
'exit_code': chan.recv_exit_status()
}
for line in stdout:
result['stdout'].append(line)
for line in stderr:
result['stderr'].append(line)
chan.close()
return result
def mkdir(self, path):
if self.exists(path):
return
logger.debug("Creating directory: %s" % path)
self.execute("mkdir -p %s\n" % path)
def rm_rf(self, path):
logger.debug("Removing directory: %s" % path)
self.execute("rm -rf %s" % path)
def open(self, path, mode='r'):
return self._sftp.open(path, mode)
def upload(self, source, target):
logger.debug("Copying '%s' -> '%s'" % (source, target))
if self.isdir(target):
target = posixpath.join(target, os.path.basename(source))
if not os.path.isdir(source):
self._sftp.put(source, target)
return
for rootdir, subdirs, files in os.walk(source):
targetdir = os.path.normpath(os.path.join(target, os.path.relpath(rootdir, source))).replace("\\", "/")
self.mkdir(targetdir)
for entry in files:
local_path = os.path.join(rootdir, entry)
remote_path = posixpath.join(targetdir, entry)
if self.exists(remote_path):
self._sftp.unlink(remote_path)
self._sftp.put(local_path, remote_path)
def exists(self, path):
try:
self._sftp.lstat(path)
return True
except IOError:
return False
def isfile(self, path):
try:
attrs = self._sftp.lstat(path)
return attrs.st_mode & stat.S_IFREG != 0
except IOError:
return False
def isdir(self, path):
try:
attrs = self._sftp.lstat(path)
return attrs.st_mode & stat.S_IFDIR != 0
except IOError:
return False
def ssh(*args, **kwargs):
return SSHClient(*args, **kwargs)
class HttpServer:
class Handler(SimpleHTTPRequestHandler):
logger = logging.getLogger('devops.helpers.http_server')
def __init__(self, docroot, *args, **kwargs):
self.docroot = docroot
SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
# Suppress reverse DNS lookups to speed up processing
def address_string(self):
return self.client_address[0]
# Handle docroot
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = self.docroot
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
path = os.path.join(path, word)
return path
def log_message(self, format, *args):
self.logger.info(format % args)
def __init__(self, document_root):
self.port = get_free_port()
self.document_root = document_root
def handler_factory(*args, **kwargs):
return HttpServer.Handler(document_root, *args, **kwargs)
self._server = BaseHTTPServer.HTTPServer(('', self.port), handler_factory)
self._thread = Thread(target=self._server.serve_forever)
self._thread.daemon = True
def start(self):
self._thread.start()
def run(self):
self._thread.join()
def stop(self):
self._server.shutdown()
self._thread.join()
def http_server(document_root):
server = HttpServer(document_root)
server.start()
return server
def xmlrpctoken(uri, login, password):
server = xmlrpclib.Server(uri)
try:
return server.login(login, password)
except:
raise AuthenticationError, "Error occured while login process"
def xmlrpcmethod(uri, method):
server = xmlrpclib.Server(uri)
try:
return getattr(server, method)
except:
raise AttributeError, "Error occured while getting server method"
import os
import os.path
import urllib
import stat
import socket
import time
import httplib
import xmlrpclib
import paramiko
import string
import random
from threading import Thread
import BaseHTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import posixpath
import logging
from devops.error import DevopsError
logger = logging.getLogger('devops.helpers')
class TimeoutError(Exception): pass
class AuthenticationError(Exception): pass
def get_free_port():
ports = range(32000, 32100)
random.shuffle(ports)
for port in ports:
if not tcp_ping('localhost', port):
return port
raise DevopsError, "No free ports available"
def icmp_ping(host, timeout=1):
"icmp_ping(host, timeout=1) - returns True if host is pingable; False - otherwise."
return os.system("ping -c 1 -W '%(timeout)d' '%(host)s' 1>/dev/null 2>&1" % { 'host': str(host), 'timeout': timeout}) == 0
def tcp_ping(host, port):
"tcp_ping(host, port) - returns True if TCP connection to specified host and port can be established; False - otherwise."
s = socket.socket()
try:
s.connect((str(host), int(port)))
except socket.error:
return False
s.close()
return True
def wait(predicate, interval=5, timeout=None):
"""
wait(predicate, interval=5, timeout=None) - wait until predicate will become True. Returns number of seconds that is left or 0 if timeout is None.
Options:
interval - seconds between checks.
timeout - raise TimeoutError if predicate won't become True after this amount of seconds. 'None' disables timeout.
"""
start_time = time.time()
while not predicate():
if timeout and start_time + timeout < time.time():
raise TimeoutError, "Waiting timed out"
seconds_to_sleep = interval
if timeout:
seconds_to_sleep = max(0, min(seconds_to_sleep, start_time + timeout - time.time()))
time.sleep(seconds_to_sleep)
return timeout + start_time - time.time() if timeout else 0
def http(host='localhost', port=80, method='GET', url='/', waited_code=200):
try:
conn = httplib.HTTPConnection(str(host), int(port))
conn.request(method, url)
res = conn.getresponse()
if res.status == waited_code:
return True
return False
except:
return False
class KeyPolicy(paramiko.WarningPolicy):
def missing_host_key(self, client, hostname, key):
return
class SSHClient(object):
class get_sudo(object):
def __init__(self, ssh):
self.ssh = ssh
def __enter__(self):
self.ssh.sudo_mode = True
def __exit__(self, type, value, traceback):
self.ssh.sudo_mode = False
def __init__(self, host, port=22, username=None, password=None):
self.host = str(host)
self.port = int(port)
self.username = username
self.password = password
self.sudo_mode = False
self.sudo = self.get_sudo(self)
self.reconnect()
def __del__(self):
self._sftp.close()
self._ssh.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
pass
def reconnect(self):
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(self.host, port=self.port, username=self.username, password=self.password)
self._sftp = self._ssh.open_sftp()
def execute(self, command):
logger.debug("Executing command: '%s'" % command.rstrip())
chan = self._ssh.get_transport().open_session()
stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
cmd = "%s\n" % command
if self.sudo_mode:
cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
chan.exec_command(cmd)
if stdout.channel.closed is False:
stdin.write('%s\n' % self.password)
stdin.flush()
result = {
'stdout': [],
'stderr': [],
'exit_code': chan.recv_exit_status()
}
for line in stdout:
result['stdout'].append(line)
for line in stderr:
result['stderr'].append(line)
chan.close()
return result
def mkdir(self, path):
if self.exists(path):
return
logger.debug("Creating directory: %s" % path)
self.execute("mkdir -p %s\n" % path)
def rm_rf(self, path):
logger.debug("Removing directory: %s" % path)
self.execute("rm -rf %s" % path)
def open(self, path, mode='r'):
return self._sftp.open(path, mode)
def upload(self, source, target):
logger.debug("Copying '%s' -> '%s'" % (source, target))
if self.isdir(target):
target = posixpath.join(target, os.path.basename(source))
if not os.path.isdir(source):
self._sftp.put(source, target)
return
for rootdir, subdirs, files in os.walk(source):
targetdir = os.path.normpath(os.path.join(target, os.path.relpath(rootdir, source))).replace("\\", "/")
self.mkdir(targetdir)
for entry in files:
local_path = os.path.join(rootdir, entry)
remote_path = posixpath.join(targetdir, entry)
if self.exists(remote_path):
self._sftp.unlink(remote_path)
self._sftp.put(local_path, remote_path)
def exists(self, path):
try:
self._sftp.lstat(path)
return True
except IOError:
return False
def isfile(self, path):
try:
attrs = self._sftp.lstat(path)
return attrs.st_mode & stat.S_IFREG != 0
except IOError:
return False
def isdir(self, path):
try:
attrs = self._sftp.lstat(path)
return attrs.st_mode & stat.S_IFDIR != 0
except IOError:
return False
def ssh(*args, **kwargs):
return SSHClient(*args, **kwargs)
class HttpServer:
class Handler(SimpleHTTPRequestHandler):
logger = logging.getLogger('devops.helpers.http_server')
def __init__(self, docroot, *args, **kwargs):
self.docroot = docroot
SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
# Suppress reverse DNS lookups to speed up processing
def address_string(self):
return self.client_address[0]
# Handle docroot
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = self.docroot
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
path = os.path.join(path, word)
return path
def log_message(self, format, *args):
self.logger.info(format % args)
def __init__(self, document_root):
self.port = get_free_port()
self.document_root = document_root
def handler_factory(*args, **kwargs):
return HttpServer.Handler(document_root, *args, **kwargs)
self._server = BaseHTTPServer.HTTPServer(('', self.port), handler_factory)
self._thread = Thread(target=self._server.serve_forever)
self._thread.daemon = True
def start(self):
self._thread.start()
def run(self):
self._thread.join()
def stop(self):
self._server.shutdown()
self._thread.join()
def http_server(document_root):
server = HttpServer(document_root)
server.start()
return server
def xmlrpctoken(uri, login, password):
server = xmlrpclib.Server(uri)
try:
return server.login(login, password)
except:
raise AuthenticationError, "Error occured while login process"
def xmlrpcmethod(uri, method):
server = xmlrpclib.Server(uri)
try:
return getattr(server, method)
except:
raise AttributeError, "Error occured while getting server method"

View File

@ -1,39 +1,39 @@
import logging
from error import DevopsError
from controller import Controller
from driver.libvirt import Libvirt
import yaml_config_loader
__all__ = ['logger', 'getController', 'build', 'destroy', 'load', 'save']
logger = logging.getLogger('devops')
class ControllerSingleton(Controller):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ControllerSingleton, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def getController():
return ControllerSingleton(Libvirt())
def build(environment):
getController().build_environment(environment)
def destroy(environment):
getController().destroy_environment(environment)
def load(source):
source = str(source).strip()
if source.find("\n") == -1:
if not source in getController().saved_environments:
raise DevopsError, "Environment '%s' does not exist" % source
return getController().load_environment(source)
return yaml_config_loader.load(source)
def save(environment):
getController().save_environment(environment)
import logging
from error import DevopsError
from controller import Controller
from driver.libvirt import Libvirt
import yaml_config_loader
__all__ = ['logger', 'getController', 'build', 'destroy', 'load', 'save']
logger = logging.getLogger('devops')
class ControllerSingleton(Controller):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ControllerSingleton, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def getController():
return ControllerSingleton(Libvirt())
def build(environment):
getController().build_environment(environment)
def destroy(environment):
getController().destroy_environment(environment)
def load(source):
source = str(source).strip()
if source.find("\n") == -1:
if not source in getController().saved_environments:
raise DevopsError, "Environment '%s' does not exist" % source
return getController().load_environment(source)
return yaml_config_loader.load(source)
def save(environment):
getController().save_environment(environment)

View File

@ -1,44 +1,44 @@
import ipaddr
from itertools import chain
IPv4Address = ipaddr.IPv4Address
IPv4Network = ipaddr.IPv4Network
class NetworkPoolException(Exception): pass
class IpNetworksPool:
def __init__(self, net_addresses=None, prefix=24):
if not net_addresses: net_addresses = ['10.0.0.0/20']
networks = []
for address in net_addresses:
if not isinstance(address, IPv4Network):
address = IPv4Network(str(address))
networks.append(address)
self._available_networks = set(chain(*[net_address.iter_subnets(new_prefix=prefix) for net_address in networks]))
self._allocated_networks = set()
def reserve(self, network):
for overlaping_network in filter(lambda n: n.overlaps(network), self._available_networks):
self._available_networks.remove(overlaping_network)
def get(self):
"get() - allocates and returns network address"
x = self._available_networks.pop()
self._allocated_networks.add(x)
return x
def put(self, network):
"put(net_address) - return network address to pool"
x = network
if x not in self._allocated_networks:
raise NetworkPoolException, "Network address '%s' wasn't previously allocated" % str(network)
self._allocated_networks.remove(x)
self._available_networks.add(x)
@property
def is_empty(self):
return len(self._available_networks) == 0
import ipaddr
from itertools import chain
IPv4Address = ipaddr.IPv4Address
IPv4Network = ipaddr.IPv4Network
class NetworkPoolException(Exception): pass
class IpNetworksPool:
def __init__(self, net_addresses=None, prefix=24):
if not net_addresses: net_addresses = ['10.0.0.0/20']
networks = []
for address in net_addresses:
if not isinstance(address, IPv4Network):
address = IPv4Network(str(address))
networks.append(address)
self._available_networks = set(chain(*[net_address.iter_subnets(new_prefix=prefix) for net_address in networks]))
self._allocated_networks = set()
def reserve(self, network):
for overlaping_network in filter(lambda n: n.overlaps(network), self._available_networks):
self._available_networks.remove(overlaping_network)
def get(self):
"get() - allocates and returns network address"
x = self._available_networks.pop()
self._allocated_networks.add(x)
return x
def put(self, network):
"put(net_address) - return network address to pool"
x = network
if x not in self._allocated_networks:
raise NetworkPoolException, "Network address '%s' wasn't previously allocated" % str(network)
self._allocated_networks.remove(x)
self._available_networks.add(x)
@property
def is_empty(self):
return len(self._available_networks) == 0

View File

@ -1,158 +1,158 @@
import re
import my_yaml
from model import Environment, Network, Node, Disk, Interface, Cdrom
class ConfigError(Exception): pass
def load(stream):
data = my_yaml.load(stream)
if not data.has_key('nodes'):
raise ConfigError, "No nodes defined"
name = 'default'
if data.has_key('name'):
name = data['name']
environment = Environment(name)
for network_data in (data.get('networks') or []):
parse_network(environment, network_data)
for node_data in data['nodes']:
parse_node(environment, node_data)
return environment
def dump(stream):
raise "Not implemented yet"
def parse_network(environment, data):
if data.has_key('name'):
name = data['name']
elif data.has_key('network'):
name = data['network']
else:
raise ConfigError, "Unnamed network"
network = Network(name)
if data.has_key('dhcp_server'):
network.dhcp_server = data['dhcp_server']
for existing_network in environment.networks:
if existing_network.name == network.name:
raise ConfigError, "Network with given name already exists: %s" % network.name
environment.networks.append(network)
return network
def parse_node(environment, data):
if data.has_key('name'):
name = data['name']
elif data.has_key('node'):
name = data['node']
else:
raise ConfigError, "Unnamed node"
node = Node(name)
if data.has_key('cpu'):
node.cpu = data['cpu']
if data.has_key('memory'):
node.memory = data['memory']
if data.has_key('vnc'):
node.vnc = data['vnc']
if data.has_key('cdrom'):
isopath = data['cdrom']
if not isinstance(isopath, (str,)):
raise ConfigError, "It must be string containing path to iso image file"
node.cdrom = Cdrom(isopath)
if data.has_key('disk'):
disks_data = data['disk']
if type(disks_data) != list:
disks_data = (disks_data,)
for disk_data in disks_data:
if type(disk_data) == str:
try:
size = parse_size(disk_data)
node.disks.append(Disk(size=size))
except ValueError:
path = disk_data
node.disks.append(Disk(path=path))
elif isinstance(disk_data, dict):
node.disks.append(Disk(**disk_data))
else:
raise ConfigError, "Unknown disk config: %s" % str(disk_data)
if data.has_key('networks'):
networks_data = data['networks']
if type(networks_data) != list:
networks_data = (networks_data,)
for network_data in networks_data:
if type(network_data) == str:
network = None
for n in environment.networks:
if n.name == network_data:
network = n
break
# Inline networks
# if network is None:
# network = parse_network(environment, {'name': network_data})
# self.networks.append(network)
# TODO: add support for specifying additional network interface params (e.g. mac address)
if network is None:
raise ConfigError, "Unknown network %s" % network_data
node.interfaces.append(Interface(network))
if data.has_key('boot'):
boot_data = data['boot']
if type(boot_data) != list:
boot_data = list(boot_data)
for boot in boot_data:
if not boot in ('disk', 'network', 'cdrom'):
raise ConfigError, "Unknown boot option: %s" % boot
node.boot.append(boot)
else:
if len(node.disks) > 0: node.boot.append('disk')
if node.cdrom : node.boot.append('cdrom')
if len(node.interfaces) > 0: node.boot.append('network')
for existing_node in environment.nodes:
if existing_node.name == node.name:
raise ConfigError, "Node with given name already exists: %s" % node.name
environment.nodes.append(node)
SIZE_RE = re.compile('^(\d+)\s*(|kb|k|mb|m|gb|g)$')
def parse_size(s):
m = SIZE_RE.match(s.lower())
if not m:
raise ValueError, "Invalid size format: %s" % s
value = int(m.group(1))
units = m.group(2)
if units in ['k', 'kb']: multiplier=1024
elif units in ['m', 'mb']: multiplier=1024**2
elif units in ['g', 'gb']: multiplier=1024**3
elif units in ['t', 'tb']: multiplier=1024**4
elif units == '': multiplier=1
else: raise ValueError, "Invalid size format: %s" % units
return value * multiplier
import re
import my_yaml
from model import Environment, Network, Node, Disk, Interface, Cdrom
class ConfigError(Exception): pass
def load(stream):
data = my_yaml.load(stream)
if not data.has_key('nodes'):
raise ConfigError, "No nodes defined"
name = 'default'
if data.has_key('name'):
name = data['name']
environment = Environment(name)
for network_data in (data.get('networks') or []):
parse_network(environment, network_data)
for node_data in data['nodes']:
parse_node(environment, node_data)
return environment
def dump(stream):
raise "Not implemented yet"
def parse_network(environment, data):
if data.has_key('name'):
name = data['name']
elif data.has_key('network'):
name = data['network']
else:
raise ConfigError, "Unnamed network"
network = Network(name)
if data.has_key('dhcp_server'):
network.dhcp_server = data['dhcp_server']
for existing_network in environment.networks:
if existing_network.name == network.name:
raise ConfigError, "Network with given name already exists: %s" % network.name
environment.networks.append(network)
return network
def parse_node(environment, data):
if data.has_key('name'):
name = data['name']
elif data.has_key('node'):
name = data['node']
else:
raise ConfigError, "Unnamed node"
node = Node(name)
if data.has_key('cpu'):
node.cpu = data['cpu']
if data.has_key('memory'):
node.memory = data['memory']
if data.has_key('vnc'):
node.vnc = data['vnc']
if data.has_key('cdrom'):
isopath = data['cdrom']
if not isinstance(isopath, (str,)):
raise ConfigError, "It must be string containing path to iso image file"
node.cdrom = Cdrom(isopath)
if data.has_key('disk'):
disks_data = data['disk']
if type(disks_data) != list:
disks_data = (disks_data,)
for disk_data in disks_data:
if type(disk_data) == str:
try:
size = parse_size(disk_data)
node.disks.append(Disk(size=size))
except ValueError:
path = disk_data
node.disks.append(Disk(path=path))
elif isinstance(disk_data, dict):
node.disks.append(Disk(**disk_data))
else:
raise ConfigError, "Unknown disk config: %s" % str(disk_data)
if data.has_key('networks'):
networks_data = data['networks']
if type(networks_data) != list:
networks_data = (networks_data,)
for network_data in networks_data:
if type(network_data) == str:
network = None
for n in environment.networks:
if n.name == network_data:
network = n
break
# Inline networks
# if network is None:
# network = parse_network(environment, {'name': network_data})
# self.networks.append(network)
# TODO: add support for specifying additional network interface params (e.g. mac address)
if network is None:
raise ConfigError, "Unknown network %s" % network_data
node.interfaces.append(Interface(network))
if data.has_key('boot'):
boot_data = data['boot']
if type(boot_data) != list:
boot_data = list(boot_data)
for boot in boot_data:
if not boot in ('disk', 'network', 'cdrom'):
raise ConfigError, "Unknown boot option: %s" % boot
node.boot.append(boot)
else:
if len(node.disks) > 0: node.boot.append('disk')
if node.cdrom : node.boot.append('cdrom')
if len(node.interfaces) > 0: node.boot.append('network')
for existing_node in environment.nodes:
if existing_node.name == node.name:
raise ConfigError, "Node with given name already exists: %s" % node.name
environment.nodes.append(node)
SIZE_RE = re.compile('^(\d+)\s*(|kb|k|mb|m|gb|g)$')
def parse_size(s):
m = SIZE_RE.match(s.lower())
if not m:
raise ValueError, "Invalid size format: %s" % s
value = int(m.group(1))
units = m.group(2)
if units in ['k', 'kb']: multiplier=1024
elif units in ['m', 'mb']: multiplier=1024**2
elif units in ['g', 'gb']: multiplier=1024**3
elif units in ['t', 'tb']: multiplier=1024**4
elif units == '': multiplier=1
else: raise ValueError, "Invalid size format: %s" % units
return value * multiplier

View File

@ -1,12 +1,12 @@
from distutils.core import setup
setup(
name='devops',
version='0.1',
description='Library to aid creating and manipulating virtual environments',
author='Mirantis, Inc.',
author_email='product@mirantis.com',
packages=['devops', 'devops.driver'],
scripts=['bin/devops'], requires=['xmlbuilder', "ipaddr", "paramiko"]
)
from distutils.core import setup
setup(
name='devops',
version='0.1',
description='Library to aid creating and manipulating virtual environments',
author='Mirantis, Inc.',
author_email='product@mirantis.com',
packages=['devops', 'devops.driver'],
scripts=['bin/devops'], requires=['xmlbuilder', "ipaddr", "paramiko"]
)

View File

@ -1,134 +1,134 @@
import os
import urllib2
import logging
from unittest import TestCase
import paramiko
import posixpath
logger = logging.getLogger('helpers')
"""
Integration test helpers
"""
class HTTPClient(object):
def __init__(self):
self.opener = urllib2.build_opener(urllib2.HTTPHandler)
def get(self, url, log=False):
req = urllib2.Request(url)
return self._open(req, log)
def post(self, url, data="{}", content_type="application/json", log=False):
req = urllib2.Request(url, data=data)
req.add_header('Content-Type', content_type)
return self._open(req, log)
def put(self, url, data="{}", content_type="application/json", log=False):
req = urllib2.Request(url, data=data)
req.add_header('Content-Type', content_type)
req.get_method = lambda: 'PUT'
return self._open(req, log)
def _open(self, req, log):
try:
resp = self.opener.open(req)
content = resp.read()
except urllib2.HTTPError, error:
content = ": ".join([str(error.code), error.read()])
if log:
logger.debug(content)
return content
class SSHClient(object):
class get_sudo(object):
def __init__(self, client):
self.client = client
def __enter__(self):
self.client.sudo_mode = True
def __exit__(self, type, value, traceback):
self.client.sudo_mode = False
def __init__(self):
self.channel = None
self.sudo_mode = False
self.sudo = self.get_sudo(self)
self.established = False
def connect_ssh(self, host, username, password):
if not self.established:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.host = host
self.username = username
self.password = password
self.ssh_client.connect(host, username=username, password=password)
self.sftp_client = self.ssh_client.open_sftp()
self.established = True
def execute(self, command):
logger.debug("Executing command: '%s'" % command.rstrip())
chan = self.ssh_client.get_transport().open_session()
stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
cmd = "%s\n" % command
if self.sudo_mode:
cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
chan.exec_command(cmd)
if stdout.channel.closed is False:
stdin.write('%s\n' % self.password)
stdin.flush()
result = {
'stdout': [],
'stderr': [],
'exit_status': chan.recv_exit_status()
}
for line in stdout:
result['stdout'].append(line)
for line in stderr:
result['stderr'].append(line)
return result
def mkdir(self, path):
logger.debug("Creating directory: %s" % path)
return self.execute("mkdir %s\n" % path)
def rmdir(self, path):
logger.debug("Removing directory: %s" % path)
return self.execute("rm -rf %s" % path)
def open(self, path, mode='r'):
return self.sftp_client.open(path, mode)
def scp(self, frm, to):
logger.debug("Copying file: %s -> %s" % (frm, to))
self.sftp_client.put(frm, to)
def scp_d(self, frm, to):
logger.debug("Copying directory: %s -> %s" % (frm, to))
remote_root = posixpath.join(
to,
os.path.basename(frm)
)
for root, dirs, fls in os.walk(frm):
rel = os.path.relpath(root, frm).replace('\\','/')
if rel == ".":
curdir = remote_root
else:
curdir = posixpath.join(remote_root, rel)
self.mkdir(curdir)
for fl in fls:
self.scp(
os.path.join(root, fl),
posixpath.join(curdir, fl)
)
def disconnect(self):
self.sftp_client.close()
self.ssh_client.close()
self.established = False
import os
import urllib2
import logging
from unittest import TestCase
import paramiko
import posixpath
logger = logging.getLogger('helpers')
"""
Integration test helpers
"""
class HTTPClient(object):
def __init__(self):
self.opener = urllib2.build_opener(urllib2.HTTPHandler)
def get(self, url, log=False):
req = urllib2.Request(url)
return self._open(req, log)
def post(self, url, data="{}", content_type="application/json", log=False):
req = urllib2.Request(url, data=data)
req.add_header('Content-Type', content_type)
return self._open(req, log)
def put(self, url, data="{}", content_type="application/json", log=False):
req = urllib2.Request(url, data=data)
req.add_header('Content-Type', content_type)
req.get_method = lambda: 'PUT'
return self._open(req, log)
def _open(self, req, log):
try:
resp = self.opener.open(req)
content = resp.read()
except urllib2.HTTPError, error:
content = ": ".join([str(error.code), error.read()])
if log:
logger.debug(content)
return content
class SSHClient(object):
class get_sudo(object):
def __init__(self, client):
self.client = client
def __enter__(self):
self.client.sudo_mode = True
def __exit__(self, type, value, traceback):
self.client.sudo_mode = False
def __init__(self):
self.channel = None
self.sudo_mode = False
self.sudo = self.get_sudo(self)
self.established = False
def connect_ssh(self, host, username, password):
if not self.established:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.host = host
self.username = username
self.password = password
self.ssh_client.connect(host, username=username, password=password)
self.sftp_client = self.ssh_client.open_sftp()
self.established = True
def execute(self, command):
logger.debug("Executing command: '%s'" % command.rstrip())
chan = self.ssh_client.get_transport().open_session()
stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
cmd = "%s\n" % command
if self.sudo_mode:
cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
chan.exec_command(cmd)
if stdout.channel.closed is False:
stdin.write('%s\n' % self.password)
stdin.flush()
result = {
'stdout': [],
'stderr': [],
'exit_status': chan.recv_exit_status()
}
for line in stdout:
result['stdout'].append(line)
for line in stderr:
result['stderr'].append(line)
return result
def mkdir(self, path):
logger.debug("Creating directory: %s" % path)
return self.execute("mkdir %s\n" % path)
def rmdir(self, path):
logger.debug("Removing directory: %s" % path)
return self.execute("rm -rf %s" % path)
def open(self, path, mode='r'):
return self.sftp_client.open(path, mode)
def scp(self, frm, to):
logger.debug("Copying file: %s -> %s" % (frm, to))
self.sftp_client.put(frm, to)
def scp_d(self, frm, to):
logger.debug("Copying directory: %s -> %s" % (frm, to))
remote_root = posixpath.join(
to,
os.path.basename(frm)
)
for root, dirs, fls in os.walk(frm):
rel = os.path.relpath(root, frm).replace('\\','/')
if rel == ".":
curdir = remote_root
else:
curdir = posixpath.join(remote_root, rel)
self.mkdir(curdir)
for fl in fls:
self.scp(
os.path.join(root, fl),
posixpath.join(curdir, fl)
)
def disconnect(self):
self.sftp_client.close()
self.ssh_client.close()
self.established = False

View File

@ -1,159 +1,159 @@
import time, os
from devops.model import Environment, Network, Node, Disk, Cdrom, Interface
from devops.controller import Controller
from devops.driver.libvirt import Libvirt
from devops.helpers import tcp_ping, wait, TimeoutError
import traceback
import logging
import devops
logger = logging.getLogger('integration')
class Ci(object):
hostname = 'nailgun'
domain = 'mirantis.com'
installation_timeout = 1800
chef_timeout = 600
def __init__(self, cache_file=None, iso=None):
self.environment_cache_file = cache_file
self.iso = iso
self.environment = None
if self.environment_cache_file and os.path.exists(self.environment_cache_file):
logger.info("Loading existing integration environment...")
with file(self.environment_cache_file) as f:
environment_id = f.read()
try:
self.environment = devops.load(environment_id)
logger.info("Successfully loaded existing environment")
except Exception, e:
logger.error("Failed to load existing integration environment: " + str(e) + "\n" + traceback.format_exc())
pass
def setup_environment(self):
if self.environment:
return True
if not self.iso:
logger.critical("ISO path missing while trying to build integration environment")
return False
logger.info("Building integration environment")
try:
environment = Environment('integration')
network = Network('default')
environment.networks.append(network)
node = Node('admin')
node.memory = 2048
node.vnc = True
node.disks.append(Disk(size=30*1024**3))
node.interfaces.append(Interface(network))
node.cdrom = Cdrom(isopath=self.iso)
node.boot = ['disk', 'cdrom']
environment.nodes.append(node)
node2 = Node('slave')
node2.memory = 2048
node2.vnc = True
node2.disks.append(Disk(size=30*1024**3))
node2.interfaces.append(Interface(network))
node2.boot = ['network']
environment.nodes.append(node2)
devops.build(environment)
except Exception, e:
logger.error("Failed to build environment: %s\n%s" % (str(e), traceback.format_exc()))
return False
self.environment = environment
try:
node.interfaces[0].ip_addresses = network.ip_addresses[2]
logger.info("Starting admin node")
node.start()
logger.info("Waiting admin node installation software to boot")
# todo await
time.sleep(10)
logger.info("Executing admin node software installation")
node.send_keys("""<Esc><Enter>
<Wait>
/install/vmlinuz initrd=/install/initrd.gz
priority=critical
locale=en_US
file=/cdrom/preseed/manual.seed
vga=788
netcfg/get_ipaddress=%(ip)s
netcfg/get_netmask=%(mask)s
netcfg/get_gateway=%(gw)s
netcfg/get_nameservers=%(gw)s
netcfg/confirm_static=true
netcfg/get_hostname=%(hostname)s
netcfg/get_domai=%(domain)s
<Enter>
""" % { 'ip': node.ip_address,
'mask': network.ip_addresses.netmask,
'gw': network.ip_addresses[1],
'hostname': self.hostname,
'domain': self.domain})
logger.info("Waiting for completion of admin node software installation")
wait(lambda: tcp_ping(node.ip_address, 22), timeout=self.installation_timeout)
logger.info("Got SSH access to admin node, waiting for ports 80 and 8000 to open")
wait(lambda: tcp_ping(node.ip_address, 80) and tcp_ping(node.ip_address, 8000), timeout=self.chef_timeout)
logger.info("Admin node software is installed and ready for use")
devops.save(self.environment)
try:
os.makedirs(os.path.dirname(self.environment_cache_file))
except OSError as e:
logger.warning("Error occured while creating directory: %s", os.path.dirname(self.environment_cache_file))
with file(self.environment_cache_file, 'w') as f:
f.write(self.environment.id)
logger.info("Environment has been saved")
except Exception, e:
devops.save(self.environment)
cache_file = self.environment_cache_file + '.candidate'
try:
os.makedirs(os.path.dirname(cache_file))
except OSError:
logger.warning("Exception occured while making directory: %s" % os.path.dirname(cache_file))
with file(cache_file, 'w') as f:
f.write(self.environment.id)
logger.error("Failed to build environment. Candidate environment cache file is %s" % cache_file)
return False
return True
def destroy_environment(self):
if self.environment:
devops.destroy(self.environment)
if self.environment_cache_file and os.path.exists(self.environment_cache_file):
os.remove(self.environment_cache_file)
return True
ci = None
def setUp():
if not ci.setup_environment():
raise Exception, "Failed to setup integration environment"
def tearDown():
if not ci.environment_cache_file:
ci.destroy_environment()
import time, os
from devops.model import Environment, Network, Node, Disk, Cdrom, Interface
from devops.controller import Controller
from devops.driver.libvirt import Libvirt
from devops.helpers import tcp_ping, wait, TimeoutError
import traceback
import logging
import devops
logger = logging.getLogger('integration')
class Ci(object):
hostname = 'nailgun'
domain = 'mirantis.com'
installation_timeout = 1800
chef_timeout = 600
def __init__(self, cache_file=None, iso=None):
self.environment_cache_file = cache_file
self.iso = iso
self.environment = None
if self.environment_cache_file and os.path.exists(self.environment_cache_file):
logger.info("Loading existing integration environment...")
with file(self.environment_cache_file) as f:
environment_id = f.read()
try:
self.environment = devops.load(environment_id)
logger.info("Successfully loaded existing environment")
except Exception, e:
logger.error("Failed to load existing integration environment: " + str(e) + "\n" + traceback.format_exc())
pass
def setup_environment(self):
if self.environment:
return True
if not self.iso:
logger.critical("ISO path missing while trying to build integration environment")
return False
logger.info("Building integration environment")
try:
environment = Environment('integration')
network = Network('default')
environment.networks.append(network)
node = Node('admin')
node.memory = 2048
node.vnc = True
node.disks.append(Disk(size=30*1024**3))
node.interfaces.append(Interface(network))
node.cdrom = Cdrom(isopath=self.iso)
node.boot = ['disk', 'cdrom']
environment.nodes.append(node)
node2 = Node('slave')
node2.memory = 2048
node2.vnc = True
node2.disks.append(Disk(size=30*1024**3))
node2.interfaces.append(Interface(network))
node2.boot = ['network']
environment.nodes.append(node2)
devops.build(environment)
except Exception, e:
logger.error("Failed to build environment: %s\n%s" % (str(e), traceback.format_exc()))
return False
self.environment = environment
try:
node.interfaces[0].ip_addresses = network.ip_addresses[2]
logger.info("Starting admin node")
node.start()
logger.info("Waiting admin node installation software to boot")
# todo await
time.sleep(10)
logger.info("Executing admin node software installation")
node.send_keys("""<Esc><Enter>
<Wait>
/install/vmlinuz initrd=/install/initrd.gz
priority=critical
locale=en_US
file=/cdrom/preseed/manual.seed
vga=788
netcfg/get_ipaddress=%(ip)s
netcfg/get_netmask=%(mask)s
netcfg/get_gateway=%(gw)s
netcfg/get_nameservers=%(gw)s
netcfg/confirm_static=true
netcfg/get_hostname=%(hostname)s
netcfg/get_domai=%(domain)s
<Enter>
""" % { 'ip': node.ip_address,
'mask': network.ip_addresses.netmask,
'gw': network.ip_addresses[1],
'hostname': self.hostname,
'domain': self.domain})
logger.info("Waiting for completion of admin node software installation")
wait(lambda: tcp_ping(node.ip_address, 22), timeout=self.installation_timeout)
logger.info("Got SSH access to admin node, waiting for ports 80 and 8000 to open")
wait(lambda: tcp_ping(node.ip_address, 80) and tcp_ping(node.ip_address, 8000), timeout=self.chef_timeout)
logger.info("Admin node software is installed and ready for use")
devops.save(self.environment)
try:
os.makedirs(os.path.dirname(self.environment_cache_file))
except OSError as e:
logger.warning("Error occured while creating directory: %s", os.path.dirname(self.environment_cache_file))
with file(self.environment_cache_file, 'w') as f:
f.write(self.environment.id)
logger.info("Environment has been saved")
except Exception, e:
devops.save(self.environment)
cache_file = self.environment_cache_file + '.candidate'
try:
os.makedirs(os.path.dirname(cache_file))
except OSError:
logger.warning("Exception occured while making directory: %s" % os.path.dirname(cache_file))
with file(cache_file, 'w') as f:
f.write(self.environment.id)
logger.error("Failed to build environment. Candidate environment cache file is %s" % cache_file)
return False
return True
def destroy_environment(self):
if self.environment:
devops.destroy(self.environment)
if self.environment_cache_file and os.path.exists(self.environment_cache_file):
os.remove(self.environment_cache_file)
return True
ci = None
def setUp():
if not ci.setup_environment():
raise Exception, "Failed to setup integration environment"
def tearDown():
if not ci.environment_cache_file:
ci.destroy_environment()

View File

@ -1,22 +1,22 @@
from unittest.case import TestCase
from integration import ci
from helpers import HTTPClient
import logging
logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG)
class Base(TestCase):
client = HTTPClient()
def get_admin_node_ip(self):
if ci is not None:
return ci.environment.node['admin'].ip_address
else:
return "10.20.0.2"
def get_id_by_mac(self, mac_address):
return mac_address.replace(":", "").upper()
# def getSlaveNodeIp(self, getIdByMac):
from unittest.case import TestCase
from integration import ci
from helpers import HTTPClient
import logging
logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG)
class Base(TestCase):
client = HTTPClient()
def get_admin_node_ip(self):
if ci is not None:
return ci.environment.node['admin'].ip_address
else:
return "10.20.0.2"
def get_id_by_mac(self, mac_address):
return mac_address.replace(":", "").upper()
# def getSlaveNodeIp(self, getIdByMac):

View File

@ -1,47 +1,47 @@
import logging
import xmlrpclib
from time import sleep
from devops.helpers import wait, tcp_ping, http, ssh
from integration.base import Base
from helpers import SSHClient
class TestCobbler(Base):
def __init__(self, *args, **kwargs):
super(TestCobbler, self).__init__(*args, **kwargs)
self.remote = SSHClient()
self.logpath = "/var/log/chef/solo.log"
self.str_success = "Report handlers complete"
def setUp(self):
self.ip = self.get_admin_node_ip()
def tearDown(self):
pass
def test_cobbler_alive(self):
logging.info("Waiting for handlers to complete")
self.remote.connect_ssh(str(self.ip), "ubuntu", "r00tme")
count = 0
while True:
res = self.remote.execute("grep '%s' '%s'" % (self.str_success, self.logpath))
count += 1
if not res['exit_status']:
break
sleep(5)
if count == 200:
raise Exception("Chef handlers failed to complete")
self.remote.disconnect()
wait(lambda: http(host=self.ip, url='/cobbler_api', waited_code=501), timeout=60)
server = xmlrpclib.Server('http://%s/cobbler_api' % self.ip)
token = server.login('cobbler', 'cobbler')
assert server.ping() == True
import logging
import xmlrpclib
from time import sleep
from devops.helpers import wait, tcp_ping, http, ssh
from integration.base import Base
from helpers import SSHClient
class TestCobbler(Base):
def __init__(self, *args, **kwargs):
super(TestCobbler, self).__init__(*args, **kwargs)
self.remote = SSHClient()
self.logpath = "/var/log/chef/solo.log"
self.str_success = "Report handlers complete"
def setUp(self):
self.ip = self.get_admin_node_ip()
def tearDown(self):
pass
def test_cobbler_alive(self):
logging.info("Waiting for handlers to complete")
self.remote.connect_ssh(str(self.ip), "ubuntu", "r00tme")
count = 0
while True:
res = self.remote.execute("grep '%s' '%s'" % (self.str_success, self.logpath))
count += 1
if not res['exit_status']:
break
sleep(5)
if count == 200:
raise Exception("Chef handlers failed to complete")
self.remote.disconnect()
wait(lambda: http(host=self.ip, url='/cobbler_api', waited_code=501), timeout=60)
server = xmlrpclib.Server('http://%s/cobbler_api' % self.ip)
token = server.login('cobbler', 'cobbler')
assert server.ping() == True

View File

@ -1,280 +1,280 @@
import shlex
import os
import sys
import logging
import time
import json
import urllib2
import pprint
from unittest import TestCase
from subprocess import Popen, PIPE
#import posixpath
import paramiko
import posixpath
from devops.helpers import wait, tcp_ping, http
from integration import ci
from integration.base import Base
from helpers import HTTPClient, SSHClient
from root import root
logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG)
AGENT_PATH = root("bin", "agent")
DEPLOY_PATH = root("bin", "deploy")
COOKBOOKS_PATH = root("cooks", "cookbooks")
SAMPLE_PATH = root("scripts", "ci")
SAMPLE_REMOTE_PATH = "/home/ubuntu"
class StillPendingException(Exception):
pass
class TestNode(Base):
def __init__(self, *args, **kwargs):
super(TestNode, self).__init__(*args, **kwargs)
self.remote = SSHClient()
self.admin_host = None
self.admin_user = "ubuntu"
self.admin_passwd = "r00tme"
self.slave_host = None
self.slave_user = "root"
self.slave_passwd = "r00tme"
self.release_id = None
def setUp(self):
self.ip = self.get_admin_node_ip()
self.admin_host = self.ip
cookbook_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-cook")
mysql_remote_path = posixpath.join(COOKBOOKS_PATH, "mysql")
release_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-release.json")
self.remote.connect_ssh(self.admin_host, self.admin_user, self.admin_passwd)
self.remote.rmdir(cookbook_remote_path)
self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "cookbooks"))
self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "solo"))
self.remote.scp(
os.path.join(SAMPLE_PATH, "sample-release.json"),
release_remote_path
)
self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo"))
self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo/config"))
self.remote.scp_d(
os.path.join(SAMPLE_PATH, "sample-cook"),
SAMPLE_REMOTE_PATH
)
self.remote.scp_d(
COOKBOOKS_PATH,
SAMPLE_REMOTE_PATH
)
attempts = 0
while True:
releases = json.loads(self.client.get(
"http://%s:8000/api/releases/" % self.admin_host
))
for r in releases:
logging.debug("Found release name: %s" % r["name"])
if r["name"] == "Sample release":
logging.debug("Sample release id: %s" % r["id"])
self.release_id = r["id"]
break
if self.release_id:
break
if attempts >= 1:
raise Exception("Release is not found")
logging.error("Sample release is not found. Trying to upload")
with self.remote.sudo:
cmd = "/opt/nailgun/bin/create_release -f %s" % \
release_remote_path
logging.info("Launching command: %s" % cmd)
res = self.remote.execute(cmd)
if res['exit_status'] != 0:
self.remote.disconnect()
raise Exception("Command failed: %s" % str(res))
attempts += 1
# todo install_cookbook always return 0
commands = [
"/opt/nailgun/bin/install_cookbook %s" % cookbook_remote_path
]
with self.remote.sudo:
for cmd in commands:
logging.info("Launching command: %s" % cmd)
res = self.remote.execute(cmd)
logging.debug("Command result: %s" % pprint.pformat(res))
if res['exit_status']:
self.remote.disconnect()
raise Exception("Command failed: %s" % str(res))
self.remote.disconnect()
def test_node_deploy(self):
try:
self.get_slave_id()
except :
pass
timer = time.time()
timeout = 600
while True:
node = self.get_slave_node(self.get_slave_id())
if node is not None:
logging.info("Node found")
self.slave_host = node["ip"]
break
else:
logging.info("Node not found")
if (time.time() - timer) > timeout:
raise Exception("Slave node agent failed to execute!")
time.sleep(15)
logging.info("Waiting for slave agent to run...")
try:
cluster = json.loads(self.client.get(
"http://%s:8000/api/clusters/1" % self.admin_host
))
except ValueError:
logging.info("No clusters found - creating test cluster...")
cluster = self.client.post(
"http://%s:8000/api/clusters" % self.admin_host,
data='{ "name": "MyOwnPrivateCluster", "release": %s }' % \
self.release_id, log=True
)
cluster = json.loads(cluster)
resp = json.loads(self.client.put(
"http://%s:8000/api/clusters/1" % self.admin_host,
data='{ "nodes": ["%s"] }' % self.slave_id
))
cluster = json.loads(self.client.get(
"http://%s:8000/api/clusters/1" % self.admin_host
))
if not len(cluster["nodes"]):
raise ValueError("Failed to add node into cluster")
roles_uploaded = json.loads(self.client.get(
"http://%s:8000/api/roles?release_id=%s" % \
(self.admin_host, self.release_id)
))
"""
FIXME
WILL BE CHANGED WHEN RENDERING WILL BE REWRITTEN
"""
roles_ids = [
role["id"] for role in roles_uploaded
]
resp = json.loads(self.client.put(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id),
data='{ "new_roles": %s, "redeployment_needed": true }' % str(roles_ids)
))
if not len(resp["new_roles"]):
raise ValueError("Failed to assign roles to node")
if node["status"] == "discover":
logging.info("Node booted with bootstrap image.")
elif node["status"] == "ready":
logging.info("Node already installed.")
self._slave_delete_test_file()
logging.info("Provisioning...")
task = json.loads(self.client.put(
"http://%s:8000/api/clusters/1/changes/" % self.admin_host,
log=True
))
task_id = task['task_id']
logging.info("Task created: %s" % task_id)
logging.info("Waiting for completion of slave node software installation")
timer = time.time()
timeout = 1800
while True:
try:
task = self.client.get(
"http://%s:8000/api/tasks/%s/" % (self.admin_host, task_id)
)
logging.info(str(task))
task = json.loads(task)
if not task['ready']:
raise StillPendingException("Task %s is still pending")
if task.get('error'):
raise Exception(
"Task %s failed!\n %s" %
(task['task_id'], str(task)),
)
break
except StillPendingException:
if (time.time() - timer) > timeout:
raise Exception("Task pending timeout!")
time.sleep(30)
node = json.loads(self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id)
))
self.slave_host = node["ip"]
logging.info("Waiting for SSH access on %s" % self.slave_host)
wait(lambda: tcp_ping(self.slave_host, 22), timeout=1800)
self.remote.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd)
# check if recipes executed
ret = self.remote.execute("test -f /tmp/chef_success")
if ret['exit_status']:
raise Exception("Recipes failed to execute!")
# check mysql running
#db = MySQLdb.connect(passwd="test", user="root", host=self.slave_host)
#print db
# check recipes execution order
ret = self.remote.execute("cat /tmp/chef_success")
if [out.strip() for out in ret['stdout']] != ['monitor', 'default', 'compute']:
raise Exception("Recipes executed in a wrong order: %s!" \
% str(ret['stdout']))
# chech node status
node = json.loads(self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id)
))
self.assertEqual(node["status"], "ready")
self.remote.disconnect()
def _slave_delete_test_file(self):
logging.info("Deleting test file...")
slave_client = SSHClient()
slave_client.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd)
res = slave_client.execute("rm -rf /tmp/chef_success")
slave_client.disconnect()
# create node with predefined mac address
def get_slave_id(self):
if hasattr(self,"slave_id"): return self.slave_id
if ci is not None:
slave = ci.environment.node['slave']
slave_id = self.get_id_by_mac(slave.interfaces[0].mac_address)
logging.info("Starting slave node")
slave.start()
logging.info("Nailgun IP: %s" % self.admin_host)
else:
response = self.client.get(
"http://%s:8000/api/nodes" % self.admin_host
)
last_node = json.loads(response)[-1]
slave_id = self.get_id_by_mac(last_node['mac'])
self.slave_id = slave_id
return slave_id
def get_slave_node(self, slave_id):
response = self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, slave_id)
)
if response.startswith("404"):
return None
return json.loads(response)
import shlex
import os
import sys
import logging
import time
import json
import urllib2
import pprint
from unittest import TestCase
from subprocess import Popen, PIPE
#import posixpath
import paramiko
import posixpath
from devops.helpers import wait, tcp_ping, http
from integration import ci
from integration.base import Base
from helpers import HTTPClient, SSHClient
from root import root
logging.basicConfig(format=':%(lineno)d: %(asctime)s %(message)s', level=logging.DEBUG)
AGENT_PATH = root("bin", "agent")
DEPLOY_PATH = root("bin", "deploy")
COOKBOOKS_PATH = root("cooks", "cookbooks")
SAMPLE_PATH = root("scripts", "ci")
SAMPLE_REMOTE_PATH = "/home/ubuntu"
class StillPendingException(Exception):
pass
class TestNode(Base):
def __init__(self, *args, **kwargs):
super(TestNode, self).__init__(*args, **kwargs)
self.remote = SSHClient()
self.admin_host = None
self.admin_user = "ubuntu"
self.admin_passwd = "r00tme"
self.slave_host = None
self.slave_user = "root"
self.slave_passwd = "r00tme"
self.release_id = None
def setUp(self):
self.ip = self.get_admin_node_ip()
self.admin_host = self.ip
cookbook_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-cook")
mysql_remote_path = posixpath.join(COOKBOOKS_PATH, "mysql")
release_remote_path = posixpath.join(SAMPLE_REMOTE_PATH, "sample-release.json")
self.remote.connect_ssh(self.admin_host, self.admin_user, self.admin_passwd)
self.remote.rmdir(cookbook_remote_path)
self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "cookbooks"))
self.remote.rmdir(os.path.join(SAMPLE_REMOTE_PATH, "solo"))
self.remote.scp(
os.path.join(SAMPLE_PATH, "sample-release.json"),
release_remote_path
)
self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo"))
self.remote.mkdir(os.path.join(SAMPLE_REMOTE_PATH, "solo/config"))
self.remote.scp_d(
os.path.join(SAMPLE_PATH, "sample-cook"),
SAMPLE_REMOTE_PATH
)
self.remote.scp_d(
COOKBOOKS_PATH,
SAMPLE_REMOTE_PATH
)
attempts = 0
while True:
releases = json.loads(self.client.get(
"http://%s:8000/api/releases/" % self.admin_host
))
for r in releases:
logging.debug("Found release name: %s" % r["name"])
if r["name"] == "Sample release":
logging.debug("Sample release id: %s" % r["id"])
self.release_id = r["id"]
break
if self.release_id:
break
if attempts >= 1:
raise Exception("Release is not found")
logging.error("Sample release is not found. Trying to upload")
with self.remote.sudo:
cmd = "/opt/nailgun/bin/create_release -f %s" % \
release_remote_path
logging.info("Launching command: %s" % cmd)
res = self.remote.execute(cmd)
if res['exit_status'] != 0:
self.remote.disconnect()
raise Exception("Command failed: %s" % str(res))
attempts += 1
# todo install_cookbook always return 0
commands = [
"/opt/nailgun/bin/install_cookbook %s" % cookbook_remote_path
]
with self.remote.sudo:
for cmd in commands:
logging.info("Launching command: %s" % cmd)
res = self.remote.execute(cmd)
logging.debug("Command result: %s" % pprint.pformat(res))
if res['exit_status']:
self.remote.disconnect()
raise Exception("Command failed: %s" % str(res))
self.remote.disconnect()
def test_node_deploy(self):
try:
self.get_slave_id()
except :
pass
timer = time.time()
timeout = 600
while True:
node = self.get_slave_node(self.get_slave_id())
if node is not None:
logging.info("Node found")
self.slave_host = node["ip"]
break
else:
logging.info("Node not found")
if (time.time() - timer) > timeout:
raise Exception("Slave node agent failed to execute!")
time.sleep(15)
logging.info("Waiting for slave agent to run...")
try:
cluster = json.loads(self.client.get(
"http://%s:8000/api/clusters/1" % self.admin_host
))
except ValueError:
logging.info("No clusters found - creating test cluster...")
cluster = self.client.post(
"http://%s:8000/api/clusters" % self.admin_host,
data='{ "name": "MyOwnPrivateCluster", "release": %s }' % \
self.release_id, log=True
)
cluster = json.loads(cluster)
resp = json.loads(self.client.put(
"http://%s:8000/api/clusters/1" % self.admin_host,
data='{ "nodes": ["%s"] }' % self.slave_id
))
cluster = json.loads(self.client.get(
"http://%s:8000/api/clusters/1" % self.admin_host
))
if not len(cluster["nodes"]):
raise ValueError("Failed to add node into cluster")
roles_uploaded = json.loads(self.client.get(
"http://%s:8000/api/roles?release_id=%s" % \
(self.admin_host, self.release_id)
))
"""
FIXME
WILL BE CHANGED WHEN RENDERING WILL BE REWRITTEN
"""
roles_ids = [
role["id"] for role in roles_uploaded
]
resp = json.loads(self.client.put(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id),
data='{ "new_roles": %s, "redeployment_needed": true }' % str(roles_ids)
))
if not len(resp["new_roles"]):
raise ValueError("Failed to assign roles to node")
if node["status"] == "discover":
logging.info("Node booted with bootstrap image.")
elif node["status"] == "ready":
logging.info("Node already installed.")
self._slave_delete_test_file()
logging.info("Provisioning...")
task = json.loads(self.client.put(
"http://%s:8000/api/clusters/1/changes/" % self.admin_host,
log=True
))
task_id = task['task_id']
logging.info("Task created: %s" % task_id)
logging.info("Waiting for completion of slave node software installation")
timer = time.time()
timeout = 1800
while True:
try:
task = self.client.get(
"http://%s:8000/api/tasks/%s/" % (self.admin_host, task_id)
)
logging.info(str(task))
task = json.loads(task)
if not task['ready']:
raise StillPendingException("Task %s is still pending")
if task.get('error'):
raise Exception(
"Task %s failed!\n %s" %
(task['task_id'], str(task)),
)
break
except StillPendingException:
if (time.time() - timer) > timeout:
raise Exception("Task pending timeout!")
time.sleep(30)
node = json.loads(self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id)
))
self.slave_host = node["ip"]
logging.info("Waiting for SSH access on %s" % self.slave_host)
wait(lambda: tcp_ping(self.slave_host, 22), timeout=1800)
self.remote.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd)
# check if recipes executed
ret = self.remote.execute("test -f /tmp/chef_success")
if ret['exit_status']:
raise Exception("Recipes failed to execute!")
# check mysql running
#db = MySQLdb.connect(passwd="test", user="root", host=self.slave_host)
#print db
# check recipes execution order
ret = self.remote.execute("cat /tmp/chef_success")
if [out.strip() for out in ret['stdout']] != ['monitor', 'default', 'compute']:
raise Exception("Recipes executed in a wrong order: %s!" \
% str(ret['stdout']))
# chech node status
node = json.loads(self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, self.slave_id)
))
self.assertEqual(node["status"], "ready")
self.remote.disconnect()
def _slave_delete_test_file(self):
logging.info("Deleting test file...")
slave_client = SSHClient()
slave_client.connect_ssh(self.slave_host, self.slave_user, self.slave_passwd)
res = slave_client.execute("rm -rf /tmp/chef_success")
slave_client.disconnect()
# create node with predefined mac address
def get_slave_id(self):
if hasattr(self,"slave_id"): return self.slave_id
if ci is not None:
slave = ci.environment.node['slave']
slave_id = self.get_id_by_mac(slave.interfaces[0].mac_address)
logging.info("Starting slave node")
slave.start()
logging.info("Nailgun IP: %s" % self.admin_host)
else:
response = self.client.get(
"http://%s:8000/api/nodes" % self.admin_host
)
last_node = json.loads(response)[-1]
slave_id = self.get_id_by_mac(last_node['mac'])
self.slave_id = slave_id
return slave_id
def get_slave_node(self, slave_id):
response = self.client.get(
"http://%s:8000/api/nodes/%s" % (self.admin_host, slave_id)
)
if response.startswith("404"):
return None
return json.loads(response)

View File

@ -1,82 +1,82 @@
import os.path
import sys
import logging
import argparse
from nose.plugins.manager import PluginManager
from nose.plugins.xunit import Xunit
from root import root
sys.path[:0] = [
root('devops'),
]
import cookbooks
import integration
def main():
parser = argparse.ArgumentParser(description="Integration test suite")
parser.add_argument("-i", "--iso", dest="iso",
help="iso image path or http://url")
parser.add_argument("-l", "--level", dest="log_level", type=str,
help="log level", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="ERROR", metavar="LEVEL")
parser.add_argument('--cache-file', dest='cache_file', type=str,
help='file to store integration environment name')
parser.add_argument('--installation-timeout', dest='installation_timeout', type=int,
help='admin node installation timeout')
parser.add_argument('--chef-timeout', dest='chef_timeout', type=int,
help='admin node chef timeout')
parser.add_argument('--suite', dest='test_suite', type=str,
help='Test suite to run', choices=["integration", "cookbooks"],
default="integration")
parser.add_argument('command', choices=('setup', 'destroy', 'test'), default='test',
help="command to execute")
parser.add_argument('arguments', nargs=argparse.REMAINDER, help='arguments for nose testing framework')
params = parser.parse_args()
numeric_level = getattr(logging, params.log_level.upper())
logging.basicConfig(level=numeric_level)
paramiko_logger = logging.getLogger('paramiko')
paramiko_logger.setLevel(numeric_level+1)
if params.test_suite == 'integration':
suite = integration
elif params.test_suite == 'cookbooks':
suite = cookbooks
suite.ci = suite.Ci(params.cache_file, params.iso)
suite.ci.installation_timeout = getattr(params, 'installation_timeout', 1800)
suite.ci.chef_timeout = getattr(params, 'chef_timeout', 600)
if params.command == 'setup':
result = suite.ci.setup_environment()
elif params.command == 'destroy':
result = suite.ci.destroy_environment()
elif params.command == 'test':
import nose
import nose.config
nc = nose.config.Config()
nc.verbosity = 3
nc.plugins = PluginManager(plugins=[Xunit()])
# Set folder where to process tests
nc.configureWhere(os.path.join(os.path.dirname(os.path.abspath(__file__)), params.test_suite))
nose.main(module=suite, config=nc, argv=[
__file__,
"--with-xunit",
"--xunit-file=nosetests.xml"
]+params.arguments)
result = True
else:
print("Unknown command '%s'" % params.command)
sys.exit(1)
if not result:
sys.exit(1)
if __name__ == "__main__":
main()
import os.path
import sys
import logging
import argparse
from nose.plugins.manager import PluginManager
from nose.plugins.xunit import Xunit
from root import root
sys.path[:0] = [
root('devops'),
]
import cookbooks
import integration
def main():
parser = argparse.ArgumentParser(description="Integration test suite")
parser.add_argument("-i", "--iso", dest="iso",
help="iso image path or http://url")
parser.add_argument("-l", "--level", dest="log_level", type=str,
help="log level", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="ERROR", metavar="LEVEL")
parser.add_argument('--cache-file', dest='cache_file', type=str,
help='file to store integration environment name')
parser.add_argument('--installation-timeout', dest='installation_timeout', type=int,
help='admin node installation timeout')
parser.add_argument('--chef-timeout', dest='chef_timeout', type=int,
help='admin node chef timeout')
parser.add_argument('--suite', dest='test_suite', type=str,
help='Test suite to run', choices=["integration", "cookbooks"],
default="integration")
parser.add_argument('command', choices=('setup', 'destroy', 'test'), default='test',
help="command to execute")
parser.add_argument('arguments', nargs=argparse.REMAINDER, help='arguments for nose testing framework')
params = parser.parse_args()
numeric_level = getattr(logging, params.log_level.upper())
logging.basicConfig(level=numeric_level)
paramiko_logger = logging.getLogger('paramiko')
paramiko_logger.setLevel(numeric_level+1)
if params.test_suite == 'integration':
suite = integration
elif params.test_suite == 'cookbooks':
suite = cookbooks
suite.ci = suite.Ci(params.cache_file, params.iso)
suite.ci.installation_timeout = getattr(params, 'installation_timeout', 1800)
suite.ci.chef_timeout = getattr(params, 'chef_timeout', 600)
if params.command == 'setup':
result = suite.ci.setup_environment()
elif params.command == 'destroy':
result = suite.ci.destroy_environment()
elif params.command == 'test':
import nose
import nose.config
nc = nose.config.Config()
nc.verbosity = 3
nc.plugins = PluginManager(plugins=[Xunit()])
# Set folder where to process tests
nc.configureWhere(os.path.join(os.path.dirname(os.path.abspath(__file__)), params.test_suite))
nose.main(module=suite, config=nc, argv=[
__file__,
"--with-xunit",
"--xunit-file=nosetests.xml"
]+params.arguments)
result = True
else:
print("Unknown command '%s'" % params.command)
sys.exit(1)
if not result:
sys.exit(1)
if __name__ == "__main__":
main()