Add manager for creating VMs, PEP8 impromevents and some tests.

Test creating VMs (via fuel-devops) and deploying k8s via kargo_deploy.sh (may be found in
    openstack/fuel-ccp-installer project):

  ENV_NAME="<env_name>" IMAGE_PATH="<path_to_qcow2_image>" WORKSPACE=/tmp DEPLOY_SCRIPT="/path/to/kargo_deploy.sh" SUSPEND_ENV_ON_TEARDOWN=false py.test -vvv -s -k "create_vms or env_base"

Change-Id: I78e1d74fed98834d46b41bbc2cae0988a6f0f606
This commit is contained in:
Sergey Lebedev 2016-07-06 17:55:04 +03:00
parent c10104863d
commit 4544f39097
23 changed files with 1523 additions and 110 deletions

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from mcp_tests.helpers.containers import ContainerEngine
from mcp_tests.helpers.ssh_manager import SSHManager
from mcp_tests.helpers import containers
from mcp_tests.helpers import ssh_manager
class TestBasic(object):
@ -25,8 +25,8 @@ class TestBasic(object):
@property
def ssh_manager(self):
return SSHManager()
return ssh_manager.SSHManager()
@property
def container_engine(self):
return ContainerEngine()
return containers.ContainerEngine()

View File

@ -14,7 +14,10 @@
from __future__ import division
from mcp_tests.logger import logger
from mcp_tests import logger
LOG = logger.logger
def exec_in_container(container, cmd):
@ -49,19 +52,19 @@ class ContainerEngine(object):
def image_exists(self, tag='latest'):
cmd = "docker images | grep {0}| awk '{{print $1}}'".format(
self.image_name)
logger.info('Checking Docker images...')
LOG.info('Checking Docker images...')
result = self.remote.execute(cmd)
logger.debug(result)
LOG.debug(result)
existing_images = [line.strip().split() for line in result['stdout']]
return [self.container_repo, tag] in existing_images
def pull_image(self):
# TODO: add possibility to load image from local path or
# TODO(dtyzhnenko): add possibility to load image from local path or
# remote link provided in settings, in order to speed up downloading
cmd = 'docker pull {0}'.format(self.container_repo)
logger.debug('Downloading Rally repository/image from registry...')
LOG.debug('Downloading Rally repository/image from registry...')
result = self.remote.execute(cmd)
logger.debug(result)
LOG.debug(result)
return self.image_exists()
def run_container_command(self, command, in_background=False):
@ -81,17 +84,20 @@ class ContainerEngine(object):
container_repo=self.container_repo,
tag=self.repository_tag,
command=command))
logger.debug('Executing command "{0}" in Rally container {1}..'.format(
cmd, self.container_repo))
LOG.debug(
'Executing command "{0}" in Rally container {1}..'.format(
cmd, self.container_repo
)
)
result = self.remote.execute(cmd)
logger.debug(result)
LOG.debug(result)
return result
def setup_utils(self):
utils = ['gawk', 'vim', 'curl']
cmd = ('unset http_proxy https_proxy; apt-get update; '
'apt-get install -y {0}'.format(' '.join(utils)))
logger.debug('Installing utils "{0}" to the container...'.format(
LOG.debug('Installing utils "{0}" to the container...'.format(
utils))
result = self.run_container_command(cmd)
assert result['exit_code'] == 0, \
@ -118,8 +124,11 @@ class ContainerEngine(object):
result = self.remote.execute(check_alias_cmd)
if result['exit_code'] == 0:
return
logger.debug('Creating bash alias for {} inside container...'.format(
self.image_name))
LOG.debug(
'Creating bash alias for {} inside container...'.format(
self.image_name
)
)
create_alias_cmd = ("alias {alias_name}='docker run --user {user_id} "
"--net=\"host\" -e \"http_proxy={proxy_url}\" -t "
"-i -v {dir_for_home}:{home_bind_path} "

View File

@ -0,0 +1,318 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(slebedev): implement unit tests
import copy
import json
import re
from devops.helpers import templates
import yaml
from mcp_tests.helpers import mcp_tests_exceptions
from mcp_tests import logger
LOG = logger.logger
class DevopsConfigMissingKey(KeyError):
def __init__(self, key, keypath):
super(DevopsConfigMissingKey, self).__init__()
self.key = key
self.keypath
def __str__(self):
return "Key '{0}' by keypath '{1}' is missing".format(
self.key,
self.keypath
)
def fail_if_obj(x):
if not isinstance(x, int):
raise TypeError("Expecting int value!")
def fix_devops_config(config):
"""Function for get correct structure of config
:param config: dict
:returns: config dict
"""
if not isinstance(config, dict):
raise mcp_tests_exceptions.DevopsConfigTypeError(
type_name=type(config).__name__
)
if 'template' in config:
return copy.deepcopy(config)
else:
return {
"template": {
"devops_settings": copy.deepcopy(config)
}
}
def list_update(obj, indexes, value):
"""Procedure for setting value into list (nested too), need
in some functions where we are not able to set value directly.
e.g.: we want to change element in nested list.
obj = [12, 34, [3, 5, [0, 4], 3], 85]
list_update(obj, [2, 2, 1], 50) => obj[2][2][1] = 50
print(obj) => [12, 34, [3, 5, [0, 50], 3], 85]
:param obj: source list
:param indexes: list with indexes for recursive process
:param value: some value for setting
"""
def check_obj(obj):
if not isinstance(obj, list):
raise TypeError("obj must be a list instance!")
check_obj(obj)
if len(indexes) > 0:
cur = obj
last_index = indexes[-1]
fail_if_obj(last_index)
for i in indexes[:-1]:
fail_if_obj(i)
check_obj(cur[i])
cur = cur[i]
cur[last_index] = value
def return_obj(indexes=[]):
"""Function returns dict() or list() object given nesting, it needs by
set_value_for_dict_by_keypath().
Examples:
return_obj() => {}
return_obj([0]) => [{}]
return_obj([-1]) => [{}]
return_obj([-1, 1, -2]) => [[None, [{}, None]]]
return_obj([2]) => [None, None, {}]
return_obj([1,3]) => [None, [None, None, None, {}]]
"""
if not isinstance(indexes, list):
raise TypeError("indexes must be a list!")
if len(indexes) > 0:
# Create resulting initial object with 1 element
result = [None]
# And save it's ref
cur = result
# lambda for extending list elements
li = (lambda x: [None] * x)
# lambda for nesting of list
nesting = (lambda x: x if x >= 0 else abs(x) - 1)
# save last index
last_index = indexes[-1]
fail_if_obj(last_index)
# loop from first till penultimate elements of indexes
# we must create nesting list and set current position to
# element at next index in indexes list
for i in indexes[:-1]:
fail_if_obj(i)
cur.extend(li(nesting(i)))
cur[i] = [None]
cur = cur[i]
# Perform last index
cur.extend(li(nesting(last_index)))
cur[last_index] = {}
return result
else:
return dict()
def keypath(paths):
"""Function to make string keypath from list of paths"""
return ".".join(list(paths))
def disassemble_path(path):
"""Func for disassembling path into key and indexes list (if needed)
:param path: string
:returns: key string, indexes list
"""
pattern = re.compile("\[([0-9]*)\]")
# find all indexes of possible list object in path
indexes = (lambda x: [int(r) for r in pattern.findall(x)]
if pattern.search(x) else [])
# get key
base_key = (lambda x: re.sub(pattern, '', x))
return base_key(path), indexes(path)
def set_value_for_dict_by_keypath(source, paths, value, new_on_missing=True):
"""Procedure for setting specific value by keypath in dict
:param source: dict
:param paths: string
:param value: value to set by keypath
"""
paths = paths.lstrip(".").split(".")
walked_paths = []
# Store the last path
last_path = paths.pop()
data = source
# loop to go through dict
while len(paths) > 0:
path = paths.pop(0)
key, indexes = disassemble_path(path)
walked_paths.append(key)
if key not in data:
if new_on_missing:
# if object is missing, we create new one
data[key] = return_obj(indexes)
else:
raise DevopsConfigMissingKey(key, keypath(walked_paths[:-1]))
data = data[key]
# if we can not get element in list, we should
# throw an exception with walked path
for i in indexes:
try:
tmp = data[i]
except IndexError as err:
LOG.error(
"Couldn't access {0} element of '{1}' keypath".format(
i, keypath(walked_paths)
)
)
LOG.error(
"Dump of '{0}':\n{1}".format(
keypath(walked_paths),
json.dumps(data)
)
)
raise type(err)(
"Can't access '{0}' element of '{1}' object! "
"'{2}' object found!".format(
i,
keypath(walked_paths),
data
)
)
data = tmp
walked_paths[-1] += "[{0}]".format(i)
key, indexes = disassemble_path(last_path)
i_count = len(indexes)
if key not in data:
if new_on_missing:
data[key] = return_obj(indexes)
else:
raise DevopsConfigMissingKey(key, keypath(walked_paths))
elif i_count > 0 and not isinstance(data[key], list):
raise TypeError(
("Key '{0}' by '{1}' keypath expected as list "
"but '{3}' obj found").format(
key, keypath(walked_paths), type(data[key]).__name__
)
)
if i_count == 0:
data[key] = value
else:
try:
list_update(data[key], indexes, value)
except (IndexError, TypeError) as err:
LOG.error(
"Error while setting by '{0}' key of '{1}' keypath".format(
last_path,
keypath(walked_paths)
)
)
LOG.error(
"Dump of object by '{0}' keypath:\n{1}".format(
keypath(walked_paths),
json.dumps(data)
)
)
raise type(err)(
"Couldn't set value by '{0}' key of '{1}' keypath'".format(
last_path,
keypath(walked_paths)
)
)
class EnvironmentConfig(object):
def __init__(self):
super(EnvironmentConfig, self).__init__()
self._config = None
@property
def config(self):
return self._config
@config.setter
def config(self, config):
"""Setter for config
:param config: dict
"""
self._config = fix_devops_config(config)
def __getitem__(self, key):
if self._config is not None:
conf = self._config['template']['devops_settings']
return copy.deepcopy(conf.get(key, None))
else:
return None
@logger.logwrap
def set_value_by_keypath(self, keypath, value):
"""Function for set value of devops settings by keypath.
It's forbidden to set value of self.config directly, so
it's possible simply set value by keypath
"""
if self.config is None:
raise mcp_tests_exceptions.DevopsConfigIsNone()
conf = self._config['template']['devops_settings']
set_value_for_dict_by_keypath(conf, keypath, value)
def save(self, filename):
"""Dump current config into given file
:param filename: string
"""
if self._config is None:
raise mcp_tests_exceptions.DevopsConfigIsNone()
with open(filename, 'w') as f:
f.write(
yaml.dump(
self._config, default_flow_style=False
)
)
def load_template(self, filename):
"""Method for reading file with devops config
:param filename: string
"""
if filename is not None:
LOG.debug(
"Preparing to load config from template '{0}'".format(
filename
)
)
self.config = templates.yaml_template_load(filename)
else:
LOG.error("Template filename is not set, loading config " +
"from template aborted.")

33
mcp_tests/helpers/ext.py Normal file
View File

@ -0,0 +1,33 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
def enum(*values, **kwargs):
names = kwargs.get('names')
if names:
return collections.namedtuple('Enum', names)(*values)
return collections.namedtuple('Enum', values)(*values)
NODE_ROLE = enum(
'master',
'slave',
'k8s',
)
NETWORK_TYPE = enum(
'private',
'public'
)

View File

@ -16,6 +16,7 @@
class UnexpectedExitCode(Exception):
def __init__(self, command, ec, expected_ec, stdout=None, stderr=None):
"""Exception for unexpected exit code after executing shell/ssh command
:param command: str - executed command
:param ec: int - actual exit code
:param expected_ec: list of integers - expected exit codes
@ -50,3 +51,61 @@ class VariableNotSet(Exception):
def __str__(self):
return "Variable {0} was not set in value {1}".format(
self.variable_name, self.expected_value)
class DevopsConfigPathIsNotSet(ValueError):
def __str__(self):
return "Devops config/template path is not set!"
class DevopsConfigTypeError(TypeError):
def __init__(self, type_name):
self.type_name = type_name
super(DevopsConfigTypeError, self).__init__()
def __str__(self):
return "Devops config should be dict instead of {0}".format(
self.type_name
)
class DevopsConfigIsNone(ValueError):
def __str__(self):
return "Devops config is None!"
class EnvironmentNameIsNotSet(ValueError):
def __str__(self):
return "Couldn't get environment name!"
class EnvironmentDoesNotExist(BaseException):
def __init__(self, env_name):
super(EnvironmentDoesNotExist, self).__init__()
self.env_name = env_name
def __str__(self):
return "Environment {0} does not exist!".format(
self.env_name
)
class EnvironmentAlreadyExists(BaseException):
def __init__(self, env_name):
super(EnvironmentAlreadyExists, self).__init__()
self.env_name = env_name
def __str__(self):
return "Environment {0} already exists!".format(
self.env_name
)
class EnvironmentIsNotSet(BaseException):
def __str__(self):
return "Environment is not set!"
class BaseImageIsNotSet(BaseException):
def __str__(self):
return "Base image for creating VMs is not set!"

View File

@ -18,22 +18,25 @@ import posixpath
import re
import traceback
from devops.helpers.helpers import wait
from devops.models.node import SSHClient
from paramiko import RSAKey
from devops.helpers import helpers
from devops.models import node
from paramiko import rsakey
import six
from mcp_tests.logger import logger
from mcp_tests.helpers.metaclasses import SingletonMeta
from mcp_tests.helpers import mcp_tests_exceptions
from mcp_tests.settings import SSH_NODE_CREDENTIALS
from mcp_tests.helpers import metaclasses
from mcp_tests import logger
from mcp_tests import settings
@six.add_metaclass(SingletonMeta)
LOG = logger.logger
@six.add_metaclass(metaclasses.SingletonMeta)
class SSHManager(object):
def __init__(self):
logger.debug('SSH_MANAGER: Run constructor SSHManager')
LOG.debug('SSH_MANAGER: Run constructor SSHManager')
self.__connections = {} # Disallow direct type change and deletion
self.ip = None
self.port = None
@ -45,9 +48,9 @@ class SSHManager(object):
return self.__connections
def initialize(self, ip,
login=SSH_NODE_CREDENTIALS['login'],
password=SSH_NODE_CREDENTIALS['password']):
""" It will be moved to __init__
login=settings.SSH_NODE_CREDENTIALS['login'],
password=settings.SSH_NODE_CREDENTIALS['password']):
"""It will be moved to __init__
:param ip: ip address of node
:param login: user name
@ -61,17 +64,18 @@ class SSHManager(object):
@staticmethod
def _connect(remote):
""" Check if connection is stable and return this one
"""Check if connection is stable and return this one
:param remote:
:return:
"""
try:
wait(lambda: remote.execute("cd ~")['exit_code'] == 0, timeout=20)
helpers.wait(lambda: remote.execute("cd ~")['exit_code'] == 0,
timeout=20)
except Exception:
logger.info('SSHManager: Check for current '
'connection fails. Try to reconnect')
logger.debug(traceback.format_exc())
LOG.info('SSHManager: Check for current '
'connection fails. Try to reconnect')
LOG.debug(traceback.format_exc())
remote.reconnect()
return remote
@ -80,26 +84,26 @@ class SSHManager(object):
remote = self.get_remote(self.ip)
key_string = '/root/.ssh/id_rsa'
with remote.open(key_string) as f:
keys.append(RSAKey.from_private_key(f))
keys.append(rsakey.RSAKey.from_private_key(f))
return keys
def get_remote(self, ip, port=22):
""" Function returns remote SSH connection to node by ip address
"""Function returns remote SSH connection to node by ip address
:param ip: IP of host
:param port: port for SSH
:return: SSHClient
:return: node.SSHClient
"""
if (ip, port) not in self.connections:
logger.debug('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
LOG.debug('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
keys = self._get_keys()
ip = self.ip
username = self.login
password = self.password
ssh_client = SSHClient(
ssh_client = node.SSHClient(
host=ip,
port=port,
username=username,
@ -109,9 +113,10 @@ class SSHManager(object):
ssh_client.sudo_mode = True
self.connections[(ip, port)] = ssh_client
logger.debug('SSH_MANAGER:Return existed connection for '
'{ip}:{port}'.format(ip=ip, port=port))
logger.debug('SSH_MANAGER: Connections {0}'.format(self.connections))
LOG.debug('SSH_MANAGER:Return existed connection for '
'{ip}:{port}'.format(ip=ip, port=port))
LOG.debug(
'SSH_MANAGER: Connections {0}'.format(self.connections))
return self._connect(self.connections[(ip, port)])
def update_connection(self, ip, login=None, password=None,
@ -126,13 +131,14 @@ class SSHManager(object):
:return: None
"""
if (ip, port) in self.connections:
logger.info('SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
LOG.info(
'SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
self.connections[(ip, port)].clear()
logger.info('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
LOG.info('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
self.connections[(ip, port)] = SSHClient(
self.connections[(ip, port)] = node.SSHClient(
host=ip,
port=port,
username=login,
@ -143,8 +149,9 @@ class SSHManager(object):
def clean_all_connections(self):
for (ip, port), connection in self.connections.items():
connection.clear()
logger.info('SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
LOG.info(
'SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
def execute(self, ip, cmd, port=22):
remote = self.get_remote(ip=ip, port=port)
@ -199,7 +206,7 @@ class SSHManager(object):
"{0} Command: '{1}' "
"Details:\n{2}".format(
error_msg, cmd, details_log))
logger.error(log_msg)
LOG.error(log_msg)
if raise_on_assert:
raise mcp_tests_exceptions.UnexpectedExitCode(
cmd,
@ -208,7 +215,7 @@ class SSHManager(object):
stdout=result['stdout_str'],
stderr=result['stderr_str'])
else:
logger.debug(details_log)
LOG.debug(details_log)
if jsonify:
try:
@ -218,7 +225,7 @@ class SSHManager(object):
error_msg = (
"Unable to deserialize output of command"
" '{0}' on host {1}".format(cmd, ip))
logger.error(error_msg)
LOG.error(error_msg)
raise Exception(error_msg)
return result
@ -229,7 +236,7 @@ class SSHManager(object):
@staticmethod
def _json_deserialize(json_string):
""" Deserialize json_string and return object
"""Deserialize json_string and return object
:param json_string: string or list with json
:return: obj
@ -242,8 +249,8 @@ class SSHManager(object):
obj = json.loads(json_string)
except Exception:
log_msg = "Unable to deserialize"
logger.error("{0}. Actual string:\n{1}".format(log_msg,
json_string))
LOG.error("{0}. Actual string:\n{1}".format(log_msg,
json_string))
raise Exception(log_msg)
return obj
@ -281,7 +288,7 @@ class SSHManager(object):
def cond_upload(self, ip, source, target, port=22, condition='',
clean_target=False):
""" Upload files only if condition in regexp matches filenames
"""Upload files only if condition in regexp matches filenames
:param ip: host ip
:param source: source path
@ -292,7 +299,8 @@ class SSHManager(object):
"""
# remote = self.get_remote(ip=ip, port=port)
# maybe we should use SSHClient function. e.g. remote.isdir(target)
# maybe we should use node.SSHClient function.
# e.g. remote.isdir(target)
# we can move this function to some *_actions class
if self.isdir_on_remote(ip=ip, port=port, path=target):
target = posixpath.join(target, os.path.basename(source))
@ -306,12 +314,12 @@ class SSHManager(object):
if re.match(condition, source):
self.upload_to_remote(ip=ip, port=port,
source=source, target=target)
logger.debug("File '{0}' uploaded to the remote folder"
" '{1}'".format(source, target))
LOG.debug("File '{0}' uploaded to the remote folder"
" '{1}'".format(source, target))
return 1
else:
logger.debug("Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition, source))
LOG.debug("Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition, source))
return 0
files_count = 0
@ -332,10 +340,12 @@ class SSHManager(object):
source=local_path,
target=remote_path)
files_count += 1
logger.debug("File '{0}' uploaded to the "
"remote folder '{1}'".format(source, target))
LOG.debug("File '{0}' uploaded to the "
"remote folder '{1}'".format(
source, target))
else:
logger.debug("Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition,
local_path))
LOG.debug(
"Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition,
local_path))
return files_count

View File

@ -15,13 +15,17 @@
import os
import time
import yaml
import traceback
from mcp_tests.logger import logger
import yaml
from mcp_tests import logger
from mcp_tests import settings
LOG = logger.logger
def get_test_method_name():
raise NotImplementedError
@ -30,6 +34,7 @@ def update_yaml(yaml_tree=None, yaml_value='', is_uniq=True,
yaml_file=settings.TIMESTAT_PATH_YAML):
"""Store/update a variable in YAML file.
yaml_tree - path to the variable in YAML file, will be created if absent,
yaml_value - value of the variable, will be overwritten if exists,
is_uniq - If false, add the unique two-digit suffix to the variable name.
@ -64,7 +69,8 @@ def update_yaml(yaml_tree=None, yaml_value='', is_uniq=True,
class TimeStat(object):
""" Context manager for measuring the execution time of the code.
"""Context manager for measuring the execution time of the code.
Usage:
with TimeStat([name],[is_uniq=True]):
"""
@ -104,8 +110,8 @@ class TimeStat(object):
update_yaml(yaml_path, '{:.2f}'.format(self.total_time),
self.is_uniq)
except Exception:
logger.error("Error storing time statistic for {0}"
" {1}".format(yaml_path, traceback.format_exc()))
LOG.error("Error storing time statistic for {0}"
" {1}".format(yaml_path, traceback.format_exc()))
raise
@property

View File

@ -13,18 +13,18 @@
# under the License.
import functools
import logging
import traceback
import os
import traceback
from mcp_tests.settings import LOGS_DIR
from mcp_tests import settings
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
if not os.path.exists(settings.LOGS_DIR):
os.makedirs(settings.LOGS_DIR)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s %(filename)s:'
'%(lineno)d -- %(message)s',
filename=os.path.join(LOGS_DIR, 'tests.log'),
filename=os.path.join(settings.LOGS_DIR, 'tests.log'),
filemode='w')
console = logging.StreamHandler()

View File

View File

@ -0,0 +1,350 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from devops.helpers import helpers
from devops import models
from django import db
from mcp_tests.helpers import ext
from mcp_tests.helpers import mcp_tests_exceptions as exc
from mcp_tests import logger
from mcp_tests.models import manager
LOG = logger.logger
class EnvironmentManager(manager.Manager):
"""Class-helper for creating VMs via devops environments"""
def __init__(self, config_file=None, env_name=None, master_image=None,
node_image=None, *args, **kwargs):
"""Initializing class instance
:param env_name: environment name string
:param master_image: path to master image string
:param node_image: path to node image string
"""
super(EnvironmentManager, self).__init__(*args, **kwargs)
self.config_file = config_file
self.env_name = env_name
self.master_image = master_image
self.node_image = node_image
def merge_config_params(self):
"""Merging config with instance defined params"""
if self.devops_config.config is None:
raise exc.DevopsConfigIsNone
conf = self.devops_config
node_group = conf['groups'][0]
if self.env_name is not None:
conf.set_value_by_keypath('env_name', self.env_name)
LOG.debug('env_name redefined to {0}'.format(self.env_name))
if self.master_image is not None or self.node_image is not None:
LOG.debug('Current node_group settings:\n{0}'.format(
node_group))
for node in node_group['nodes']:
volume = node['params']['volumes'][0]
if (node['role'] == ext.NODE_ROLE.master and
self.master_image is not None):
volume['source_image'] = self.master_image
elif (node['role'] == ext.NODE_ROLE.slave and
self.node_image is not None):
volume['source_image'] = self.node_image
conf.set_value_by_keypath('group[0]', node_group)
LOG.debug('Node group updated to:\n{0}'.format(node_group))
@property
def d_env_name(self):
"""Get environment name from fuel devops config
:rtype: string
"""
return self.devops_config['env_name']
def get_env_by_name(self, name):
"""Set existing environment by name
:param name: string
"""
self._env = models.Environment.get(name=name)
def create_snapshot(self, name):
"""Create named snapshot of current env.
:name: string
"""
LOG.info("Creating snapshot named '{0}'".format(name))
if self._env is not None:
self._env.suspend()
self._env.snapshot(name, force=True)
self._env.resume()
else:
raise exc.EnvironmentIsNotSet()
def revert_snapshot(self, name):
"""Revert snapshot by name
:param name: string
"""
LOG.info("Reverting from snapshot named '{0}'".format(name))
if self._env is not None:
self._env.revert(name=name)
self._env.resume()
else:
raise exc.EnvironmentIsNotSet()
def create_environment(self):
"""Create environment and start VMs.
If config was provided earlier, we simply create and start VMs,
otherwise we tries to generate config from self.config_file,
"""
if self.devops_config.config is None:
LOG.debug('Seems config for fuel-devops is not set.')
if self.config_file is None:
raise exc.DevopsConfigPathIsNotSet()
self.devops_config.load_template(self.config_file)
self.merge_config_params()
settings = self.devops_config
env_name = settings['env_name']
LOG.debug(
'Preparing to create environment named "{0}"'.format(env_name)
)
if env_name is None:
LOG.error('Environment name is not set!')
raise exc.EnvironmentNameIsNotSet()
try:
self._env = models.Environment.create_environment(
settings.config
)
except db.IntegrityError:
LOG.error(
'Seems like environment {0} already exists.'.format(env_name)
)
raise exc.EnvironmentAlreadyExists(env_name)
self._env.define()
self.start_environment()
LOG.info(
'Environment "{0}" created and started'.format(env_name)
)
def start_environment(self):
"""Method for start environment
"""
if self._env is None:
raise exc.EnvironmentIsNotSet()
self._env.start()
def wait_ssh_k8s_nodes(self):
for node in self.k8s_nodes:
LOG.debug("Waiting for SSH on node '{}...'".format(node.name))
timeout = 360
helpers.wait(
lambda: helpers.tcp_ping(self.node_ip(node), 22),
timeout=timeout,
timeout_msg="Node '{}' didn't open SSH in {} sec".format(
node.name, timeout
)
)
def resume(self):
"""Resume environment"""
if self._env is None:
raise exc.EnvironmentIsNotSet()
self._env.resume()
def suspend(self):
"""Suspend environment"""
if self._env is None:
raise exc.EnvironmentIsNotSet()
self._env.suspend()
def stop(self):
"""Stop environment"""
if self._env is None:
raise exc.EnvironmentIsNotSet()
self._env.destroy()
def has_snapshot(self, name):
return self._env.has_snapshot(name)
def delete_environment(self):
"""Delete environment
"""
LOG.debug("Deleting environment")
self._env.erase()
def __get_nodes_by_role(self, node_role):
"""Get node by given role name
:param node_role: string
:rtype: devops.models.Node
"""
LOG.debug('Trying to get nodes by role {0}'.format(node_role))
return self._env.get_nodes(role=node_role)
@property
def master_nodes(self):
"""Get all master nodes
:rtype: list
"""
nodes = self.__get_nodes_by_role(node_role=ext.NODE_ROLE.master)
return nodes
@property
def slave_nodes(self):
"""Get all slave nodes
:rtype: list
"""
nodes = self.__get_nodes_by_role(node_role=ext.NODE_ROLE.slave)
return nodes
@property
def k8s_nodes(self):
"""Get all k8s nodes
:rtype: list
"""
nodes = self.__get_nodes_by_role(node_role=ext.NODE_ROLE.k8s)
return nodes
@staticmethod
def node_ip(node):
"""Determine node's IP
:param node: devops.models.Node
:return: string
"""
LOG.debug('Trying to determine {0} ip.'.format(node.name))
return node.get_ip_address_by_network_name(
ext.NETWORK_TYPE.public
)
@property
def admin_ips(self):
"""Property to get ip of admin role VMs
:return: list
"""
nodes = self.master_nodes
return [self.node_ip(node) for node in nodes]
@property
def slave_ips(self):
"""Property to get ip(s) of slave role VMs
:return: list
"""
nodes = self.slave_nodes
return [self.node_ip(node) for node in nodes]
@property
def k8s_ips(self):
"""Property to get ip(s) of k8s role VMs
:return: list
"""
nodes = self.k8s_nodes
return [self.node_ip(node) for node in nodes]
@staticmethod
def node_ssh_client(node, login, password=None, private_keys=None):
"""Return SSHClient for node
:param node: devops.models.Node
:param login: string
:param password: string
:param private_keys: list
:rtype: devops.helpers.helpers.SSHClient
"""
LOG.debug(
'Creating ssh client for node "{0}"'.format(node.name)
)
LOG.debug(
'Using credentials: login:{0}, password:{1}, keys:{2}'.format(
login, password, private_keys
)
)
return node.remote(
network_name=ext.NETWORK_TYPE.public,
login=login,
password=password,
private_keys=private_keys
)
@staticmethod
def send_to_node(node, source, target, login,
password=None, private_keys=None):
"""Method for sending some stuff to node
:param node: devops.models.Node
:param source: string
:param target: string
:param login: string
:param password: string
:param private_keys: list
"""
LOG.debug(
"Send '{0}' to node '{1}' into target '{2}'.".format(
source,
node.name,
target
)
)
remote = EnvironmentManager.node_ssh_client(
node=node,
login=login,
password=password,
private_keys=private_keys
)
remote.upload(source=source, target=target)
def send_to_master_nodes(self, source, target, login,
password=None, private_keys=None):
"""Send given source to master nodes"""
nodes = self.master_nodes
for node in nodes:
self.send_to_node(
node,
source=source, target=target, login=login,
password=password, private_keys=private_keys
)
def send_to_slave_nodes(self, source, target, login,
password=None, private_keys=None):
"""Send given source to slave nodes"""
nodes = self.slave_nodes
for node in nodes:
self.send_to_node(
node,
source=source, target=target, login=login,
password=password, private_keys=private_keys
)
def send_to_k8s_nodes(self, source, target, login,
password=None, private_keys=None):
"""Send given source to slave nodes"""
nodes = self.k8s_nodes
for node in nodes:
self.send_to_node(
node,
source=source, target=target, login=login,
password=password, private_keys=private_keys
)

View File

@ -12,14 +12,36 @@
# License for the specific language governing permissions and limitations
# under the License.
from mcp_tests.base_test import TestBasic
from mcp_tests import base_test
from mcp_tests.helpers import env_config
class Manager(TestBasic):
"""Manager class for tests."""
class Manager(base_test.TestBasic):
"""Base manager class."""
def __init__(self, config_file, cls):
def __init__(self):
super(Manager, self).__init__()
self._devops_config = None
self.__devops_config = env_config.EnvironmentConfig()
self._start_time = 0
self._context = cls
self._env = None
@property
def devops_config(self):
return self.__devops_config
@devops_config.setter
def devops_config(self, conf):
"""Setter for self.__devops_config
:param conf: mcp_tests.helpers.env_config.EnvironmentConfig
"""
if not isinstance(conf, env_config.EnvironmentConfig):
msg = ("Unexpected type of devops config. Got '{0}' " +
"instead of '{1}'")
raise TypeError(
msg.format(
type(conf).__name__,
env_config.EnvironmentConfig.__name__
)
)
self.__devops_config = conf

View File

@ -1,8 +1,10 @@
git+git://github.com/openstack/fuel-devops.git@2.9.20
git+git://github.com/openstack/fuel-devops.git
paramiko
six
requests>=2.2.0
pytest>=2.9
docker-py
docker-compose==1.7.1
urllib3
psycopg2
python-k8sclient

View File

@ -14,14 +14,15 @@
import logging
import time
from compose import project
from compose import service
import docker
from compose.project import Project
from compose.service import Service
from mcp_tests.logger import logger
from mcp_tests.logger import console
from mcp_tests import logger
logging.getLogger('compose.service').addHandler(console)
logging.getLogger('compose.service').addHandler(logger.console)
LOG = logger.logger
class ServiceBaseTest(object):
@ -35,27 +36,27 @@ class ServiceBaseTest(object):
1. Get image from private registry
2. Start container with it
"""
logger.info("Up services")
LOG.info("Up services")
cli = docker.Client()
project_name = cls.__name__
services = []
for s in cls.services:
services.append(
Service(
service.Service(
# name=s['name'],
project=project_name,
client=cli,
**s))
cls.project = Project(
cls.project = project.Project(
name=project_name,
services=services,
client=cli)
cls.containers = cls.project.up()
wait_services = getattr(cls, 'wait_services', 5)
logger.info("Sleep {} sec until MariDB is setting up".format(
LOG.info("Sleep {} sec until MariDB is setting up".format(
wait_services))
time.sleep(wait_services)
logger.info("Start tests")
LOG.info("Start tests")
@classmethod
def teardown_class(cls):
@ -66,6 +67,6 @@ class ServiceBaseTest(object):
6. Remove volumes
"""
logger.info("Down service and remove volume")
LOG.info("Down service and remove volume")
cls.project.down(remove_image_type=False,
include_volumes=True)

View File

@ -13,18 +13,19 @@
# under the License.
import pytest
from devops.helpers.helpers import tcp_ping
from devops.helpers.helpers import wait
from devops.helpers import helpers
from mcp_tests.helpers import containers as cs
from mcp_tests import logger
from mcp_tests import service_tests
from mcp_tests import settings
from mcp_tests.helpers.containers import exec_in_container
from mcp_tests.logger import logger
from mcp_tests.service_tests import ServiceBaseTest
LOG = logger.logger
@pytest.mark.skipif(settings.PRIVATE_REGISTRY is None,
reason="PRIVATE_REGISTRY isn't set")
class TestMysqlImage(ServiceBaseTest):
class TestMysqlImage(service_tests.ServiceBaseTest):
"""Test class consits simple tests for mysql container"""
services = [
@ -47,10 +48,10 @@ class TestMysqlImage(ServiceBaseTest):
4. Check access from root user
"""
logger.info("Trying check daemon")
LOG.info("Trying check daemon")
container = self.containers[0]
cmd = 'pgrep mysqld'
out, exit_code = exec_in_container(container, cmd)
out, exit_code = cs.exec_in_container(container, cmd)
assert exit_code == 0
@pytest.mark.mysql_base
@ -61,10 +62,10 @@ class TestMysqlImage(ServiceBaseTest):
3. Check port 3306
"""
logger.info("Trying to reach port 3306")
wait(lambda: tcp_ping('localhost', 33306),
timeout=30,
timeout_msg="MySQL port in not reacheble.")
LOG.info("Trying to reach port 3306")
helpers.wait(lambda: helpers.tcp_ping('localhost', 33306),
timeout=30,
timeout_msg="MySQL port in not reacheble.")
@pytest.mark.mysql_base
def test_mysql_is_accessible(self):
@ -74,13 +75,13 @@ class TestMysqlImage(ServiceBaseTest):
4. Check access from root user
"""
logger.info("Trying fetch databases list")
LOG.info("Trying fetch databases list")
container = self.containers[0]
cmd = 'mysql -Ns -uroot -pr00tme -e "SHOW DATABASES"'
out, exit_code = exec_in_container(container, cmd)
out, exit_code = cs.exec_in_container(container, cmd)
assert exit_code == 0
out = filter(bool, out.split('\n'))
logger.info("Databases in DB - {}".format(out))
LOG.info("Databases in DB - {}".format(out))
assert set(out) == \
set(['information_schema', 'mysql', 'performance_schema'])

View File

@ -14,11 +14,16 @@
import os
import pkg_resources
import time
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
_default_conf = pkg_resources.resource_filename(
__name__, 'templates/default.yaml')
# _default_conf = os.getcwd() + '/mcp_tests/templates/default.yaml'
def get_var_as_bool(name, default):
value = os.environ.get(name, '')
@ -29,7 +34,14 @@ TIMESTAT_PATH_YAML = os.environ.get(
'TIMESTAT_PATH_YAML', os.path.join(
LOGS_DIR, 'timestat_{}.yaml'.format(time.strftime("%Y%m%d"))))
SSH_NODE_CREDENTIALS = os.environ.get('SSH_NODE_CREDENTIALS',
{'login': 'test', 'password': 'test'})
{'login': 'vagrant',
'password': 'vagrant'})
ENV_NAME = os.environ.get('ENV_NAME', 'mcp_qa-test')
IMAGE_PATH = os.environ.get('IMAGE_PATH', None)
CONF_PATH = os.environ.get('CONF_PATH', os.path.abspath(_default_conf))
SUSPEND_ENV_ON_TEARDOWN = os.environ.get('SUSPEND_ENV_ON_TEARDOWN', True)
DEPLOY_SCRIPT = os.environ.get("DEPLOY_SCRIPT", None)
PRIVATE_REGISTRY = os.environ.get('PRIVATE_REGISTRY', None)

View File

@ -0,0 +1,49 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import pytest
from mcp_tests.helpers import mcp_tests_exceptions as exc
from mcp_tests import logger
from mcp_tests import settings
logging.getLogger('EnvironmentManager').addHandler(logger.console)
LOG = logger.logger
class SystemBaseTest(object):
"""SystemBaseTest contains setup/teardown for environment creation"""
@classmethod
def setup_class(cls):
"""Create Environment or use an existing one"""
LOG.info('Trying to get existing environment')
try:
cls.env.get_env_by_name(name=settings.ENV_NAME)
except exc.EnvironmentDoesNotExist:
LOG.info("Environment doesn't exist, creating new one")
cls.env.create_environment()
LOG.info("Environment created")
@pytest.mark.skipif(not settings.SUSPEND_ENV_ON_TEARDOWN,
reason="Suspend isn't needed"
)
@classmethod
def teardown_class(cls):
"""Suspend environment"""
LOG.info("Suspending environment")
cls.env.suspend()

View File

@ -0,0 +1,151 @@
import copy
import os
import subprocess
import pytest
import time
from devops import error
from mcp_tests.managers import envmanager
from mcp_tests import logger
from mcp_tests import settings
LOG = logger.logger
class TestCreateEnv(object):
"""Create VMs for mcpinstaller"""
env = envmanager.EnvironmentManager(settings.CONF_PATH)
empty_snapshot = "empty"
upgraded_snapshot = "upgraded"
deployed_snapshot = "kargo_deployed"
@classmethod
def setup_class(cls):
LOG.info("Creating environment")
try:
cls.env.get_env_by_name(name=settings.ENV_NAME)
except error.DevopsObjNotFound:
LOG.info("Environment doesn't exist, creating a new one")
cls.env.create_environment()
LOG.info("Environment created")
@pytest.mark.create_vms
def test_start_environment(self):
snapshot_name = self.empty_snapshot
LOG.info("Starting environment")
self.env.start_environment()
self.env.wait_ssh_k8s_nodes()
if not self.env.has_snapshot(snapshot_name):
self.env.create_snapshot(snapshot_name)
else:
self.env.revert_snapshot(snapshot_name)
@pytest.mark.create_vms
def test_upgrade_system_on_nodes(self):
snapshot_name = self.upgraded_snapshot
def upgrade(node):
soft_requirements = [
"git",
"python-setuptools",
"python-dev",
"python-pip",
"gcc",
"libssl-dev",
"libffi-dev",
"vim",
"software-properties-common"
]
commands = [
"apt-get update",
"apt-get upgrade -y",
"apt-get install -y {soft}".format(
soft=" ".join(soft_requirements)
),
"apt-get autoremove -y",
"pip install -U setuptools pip",
"pip install 'cryptography>=1.3.2'",
"pip install 'cffi>=1.6.0'"
]
LOG.info("Getting ssh connect to {node_name}".format(
node_name=node.name
))
remote = self.env.node_ssh_client(
node,
**settings.SSH_NODE_CREDENTIALS
)
with remote.get_sudo(remote):
for cmd in commands:
LOG.info(
"Running command '{cmd}' on node {node_name}".format(
cmd=cmd,
node_name=node.name
)
)
restart = True
while restart:
result = remote.execute(cmd)
if result['exit_code'] == 100:
# For some reasons dpkg may be locked by tasks
# for searching updates during login.
LOG.debug(
("dpkg is locked on {node_name},"
" another try in 5 secs").format(
node_name=node.name))
time.sleep(5)
restart = True
else:
restart = False
assert result['exit_code'] == 0
LOG.info("Closing connection to {}".format(node.name))
remote.close()
if not self.env.has_snapshot(snapshot_name):
for node in self.env.k8s_nodes:
upgrade(node)
self.env.create_snapshot(snapshot_name)
else:
self.env.revert_snapshot(snapshot_name)
@pytest.mark.create_vms
@pytest.mark.skipif(settings.DEPLOY_SCRIPT is None,
reason="Deploy script is not provided"
)
def test_deploy_kargo(self):
current_env = copy.deepcopy(os.environ)
kube_settings = [
"kube_network_plugin: \"calico\"",
"kube_proxy_mode: \"iptables\"",
# "kube_version: \"v1.2.5\"",
]
environment_variables = {
"SLAVE_IPS": " ".join(self.env.k8s_ips),
"ADMIN_IP": self.env.k8s_ips[0],
"CUSTOM_YAML": "\n".join(kube_settings),
"WORKSPACE": "/tmp",
}
current_env.update(dict=environment_variables)
assert self.env.has_snapshot(self.upgraded_snapshot)
self.env.revert_snapshot(self.upgraded_snapshot)
try:
process = subprocess.Popen([settings.DEPLOY_SCRIPT],
env=current_env,
shell=True,
bufsize=0,
)
assert process.wait() == 0
self.env.create_snapshot(self.deployed_snapshot)
except (SystemExit, KeyboardInterrupt) as err:
process.terminate()
raise err
@pytest.mark.skipif(not settings.SUSPEND_ENV_ON_TEARDOWN,
reason="Suspend isn't needed"
)
@classmethod
def teardown_class(cls):
LOG.info("Suspending VMs")
cls.env.suspend()

View File

@ -0,0 +1,130 @@
import pytest
from mcp_tests import logger
from mcp_tests.managers import envmanager
from mcp_tests import settings
from mcp_tests import system_tests
LOG = logger.logger
@pytest.mark.skipif(settings.ENV_NAME is None,
reason="Skip of missed images")
class TestDeployedEnv(system_tests.SystemBaseTest):
"""Basis test case for testing an existing environment
Scenario:
1. Get an existing environment (from setup_class of parent class)
2. Resume VMs for testing
3. Determine master ips (if exists)
4. Determine slaves ips
5. Check if ssh to each node could be get
6. Compare number of slaves with k8s' nodes number
7. Check if all base containers exist on nodes
8. Suspend VMs.
"""
env = envmanager.EnvironmentManager(
config_file=settings.CONF_PATH)
base_images = [
"calico/node",
"andyshinn/dnsmasq",
"quay.io/smana/kubernetes-hyperkube"
]
def running_containers(self, node):
"""Check if there are all base containers on node
:param node: devops.models.Node
"""
remote = self.env.node_ssh_client(
node,
**settings.SSH_NODE_CREDENTIALS
)
cmd = "docker ps --no-trunc --format '{{.Image}}'"
with remote.get_sudo(remote):
result = remote.execute(
command=cmd,
verbose=True
)
assert result['exit_code'] == 0
images = [x.split(":")[0] for x in result['stdout']]
assert set(self.base_images) < set(images)
@pytest.mark.env_base
def test_resume_vms(self):
"""Resume Environment"""
LOG.info("Trying to resume environment")
self.env.resume()
self.env.start_environment()
@pytest.mark.xfail
@pytest.mark.env_base
def test_get_master_ips(self):
"""Trying to determine master nodes ips"""
LOG.info("Trying to get master ips")
ips = self.env.admin_ips
LOG.debug("Master IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.xfail
@pytest.mark.env_base
def test_get_slaves_ips(self):
"""Trying to determine slave nodes ips"""
LOG.info("Trying to get slave ips")
ips = self.env.slave_ips
LOG.debug("Slave IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.env_base
def test_get_k8s_ips(self):
LOG.info("Trying to get k8s ips")
ips = self.env.k8s_ips
LOG.debug("K8S IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.env_base
def test_get_node_ssh(self):
"""Try to get remote client for each node"""
LOG.info("Get remote for each master node")
for node in self.env.master_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS)
assert remote is not None
LOG.info("Get remote for each slave node")
for node in self.env.slave_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS)
assert remote is not None
LOG.info("Get remote for each k8s node")
for node in self.env.k8s_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS
)
assert remote is not None
@pytest.mark.env_base
def test_kube_nodes_number_the_same(self):
"""Check number of slaves"""
LOG.info("Check number of nodes")
master = self.env.k8s_nodes[0]
remote = self.env.node_ssh_client(
master,
**settings.SSH_NODE_CREDENTIALS
)
cmd = "kubectl get nodes -o jsonpath={.items[*].metadata.name}"
result = remote.execute(command=cmd, verbose=True)
assert result["exit_code"] == 0, "Error: {0}".format(
"".join(result["stderr"])
)
k8s_nodes = result["stdout_str"].split()
devops_nodes = self.env.k8s_nodes
assert len(k8s_nodes) == len(devops_nodes)
@pytest.mark.env_base
def test_base_container_exists(self):
"""Check if all of base container exists"""
LOG.info("Checking docker container exists")
for node in self.env.k8s_nodes:
self.running_containers(node)

View File

@ -0,0 +1,119 @@
---
aliases:
dynamic_addresses_pool:
- &pool_default !os_env POOL_DEFAULT, 10.10.0.0/16:24
default_interface_model:
- &interface_model !os_env INTERFACE_MODEL, e1000
template:
devops_settings:
env_name: !os_env ENV_NAME
address_pools:
public-pool01:
net: *pool_default
params:
vlan_start: 1210
ip_reserved:
gateway: +1
l2_network_device: +1
ip_ranges:
dhcp: [+128, -32]
rack-01: [+2, +127]
private-pool01:
net: *pool_default
storage-pool01:
net: *pool_default
management-pool01:
net: *pool_default
groups:
- name: default
driver:
name: devops.driver.libvirt
params:
connection_string: !os_env CONNECTION_STRING, qemu:///system
storage_pool_name: !os_env STORAGE_POOL_NAME, default
stp: False
hpet: False
use_host_cpu: !os_env DRIVER_USE_HOST_CPU, true
network_pools:
public: public-pool01
private: private-pool01
storage: storage-pool01
management: management-pool01
l2_network_devices:
public:
address_pool: public-pool01
dhcp: true
forward:
mode: nat
storage:
address_pool: storage-pool01
dhcp: false
management:
address_pool: management-pool01
dhcp: false
private:
address_pool: private-pool01
dhcp: false
nodes:
- name: master
role: k8s
params: &rack-01-node-params
vcpu: !os_env SLAVE_NODE_CPU, 2
memory: !os_env SLAVE_NODE_MEMORY, 2048
boot:
- network
- hd
volumes:
- name: system
capacity: !os_env NODE_VOLUME_SIZE, 50
source_image: !os_env IMAGE_PATH
format: qcow2
interfaces:
- label: iface0
l2_network_device: public
interface_model: *interface_model
- label: iface1
l2_network_device: private
interface_model: *interface_model
- label: iface2
l2_network_device: storage
interface_model: *interface_model
- label: iface3
l2_network_device: management
interface_model: *interface_model
network_config:
iface0:
networks:
- public
iface1:
networks:
- private
iface2:
networks:
- storage
iface3:
networks:
- management
- name: slave-0
role: k8s
params: *rack-01-node-params
- name: slave-1
role: k8s
params: *rack-01-node-params
#
# - name: slave-2
# role: k8s-node
# params: *rack-01-node-params

View File

View File

View File

@ -0,0 +1,141 @@
import copy
import pytest
from mcp_tests.helpers import env_config as funcs
# Test data for funcs.return_obj
testdata1 = [
([], {}),
([0], [{}]),
([1], [None, {}]),
([4, 1], [None, None, None, None, [None, {}]]),
(
[3, 1, 6],
[None, None, None, [None, [None, None, None, None, None, None, {}]]]
),
(
[-1, -3, 0],
[[[{}], None, None]]
),
(
[-1, 1, -2],
[[None, [{}, None]]]
),
]
# Test data for funcs.set_value_for_dict_by_keypath
some_dict = {}
sample1 = {'params': {'settings': {'version': 3}}}
sample2 = copy.deepcopy(sample1)
sample2.update({'env_name': 'mcp_test'})
sample3 = copy.deepcopy(sample2)
sample3.update(
{
'groups': [
{
'nodes': [
None,
{
'volumes': [
{'source_image': 'some_path'}
]
}
]
}
]
}
)
testdata2 = [
(some_dict, 'params.settings.version', 3, sample1),
(some_dict, 'env_name', 'mcp_test', sample2),
(
some_dict,
'groups[0].nodes[1].volumes[0].source_image',
'some_path',
sample3
)
]
# Test data for funcs.list_update
testdata3 = [
([None, None, None], [2], 'Test', [None, None, 'Test']),
([None, None, None], [-1], 'Test', [None, None, 'Test']),
([None, [None, [None]]], [1, 1, 0], 'Test', [None, [None, ['Test']]]),
([None, [None, [None]]], [-1, 1, 0], 'Test', [None, [None, ['Test']]]),
([None, [None, [None]]], [-1, -1, 0], 'Test', [None, [None, ['Test']]]),
([None, [None, [None]]], [-1, -1, -1], 'Test', [None, [None, ['Test']]]),
]
sample_list = [
"string",
[
"sublist string",
],
{"index": 2, "value": "dict"}
]
list_update_fail = [
(sample_list, [0, 1], "test_fail"),
(sample_list, [1, 1], "test_fail"),
(sample_list, [1, 1], "test_fail"),
(sample_list, [0, [2]], "test_fail"),
(sample_list, [0, None], "test_fail"),
(sample_list, ["a"], "test_fail")
]
sample_dict = {"root": {"subroot": {"list": ["Test", "value", [1]]}}}
keypath_fail = [
(sample_dict, "root.subroot.list[2][1]", 3, True),
(sample_dict, "root.subroot.list[1][0]", 3, True),
(sample_dict, "root.subroot[0]", 3, True),
(sample_dict, "root.subroot.undefinedkey", 3, False),
]
@pytest.mark.parametrize("x,exp", testdata1)
@pytest.mark.unit_tests
@pytest.mark.return_obj
def test_return_obj_ok(x, exp):
assert funcs.return_obj(x) == exp
@pytest.mark.xfail(strict=True)
@pytest.mark.parametrize("x", ["test_fail", [[-1]], ["test_fail"], [0, [3]]])
@pytest.mark.unit_tests
@pytest.mark.return_obj
def test_return_obj_fail(x):
result = funcs.return_obj(x)
return result
@pytest.mark.parametrize("source,keypath,value,exp", testdata2)
@pytest.mark.unit_tests
@pytest.mark.set_value_for_dict_by_keypath
def test_set_value_for_dict_by_keypath_ok(source, keypath, value, exp):
funcs.set_value_for_dict_by_keypath(source, paths=keypath, value=value)
assert source == exp
@pytest.mark.xfail(strict=True)
@pytest.mark.parametrize("source,keypath,value,make_new", keypath_fail)
@pytest.mark.set_value_for_dict_by_keypath
@pytest.mark.unit_tests
def test_set_value_for_dict_by_keypath_fail(source, keypath, value, make_new):
funcs.set_value_for_dict_by_keypath(source, paths=keypath, value=value,
new_on_missing=make_new)
@pytest.mark.parametrize('obj,indexes,value,exp', testdata3)
@pytest.mark.unit_tests
@pytest.mark.list_update
def test_list_update_ok(obj, indexes, value, exp):
funcs.list_update(obj, indexes, value)
assert obj == exp
@pytest.mark.xfail(strict=True)
@pytest.mark.parametrize('obj,indexes,value', list_update_fail)
@pytest.mark.list_update
@pytest.mark.unit_tests
def test_list_update_fail(obj, indexes, value):
funcs.list_update(obj, indexes, value)

View File

@ -14,7 +14,7 @@ deps =
-r{toxinidir}/mcp_tests/requirements.txt
-r{toxinidir}/test-requirements.txt
usedevelop = False
commands = py.test --collect-only mcp_tests
commands = py.test -k unit_tests
[testenv:venv]
commands = {posargs}