Initial commit with some basic code

Also add some base helpers

Change-Id: I173240c0fdbf4cc7355e80739d5d10df8e9dd054
This commit is contained in:
Tatyana Leontovich 2016-05-17 12:22:34 +03:00
parent c7b4405179
commit 9e7594899c
19 changed files with 986 additions and 0 deletions

61
.gitignore vendored Normal file
View File

@ -0,0 +1,61 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
include
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
local
lib64
MANIFEST
TAGS
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.cache
.project
.pydevproject
.idea
# Local example
example_local.py
# Local settings
local_settings.py
# Documentation
doc/_build/
# Logs
/logs
tests.log
# Certs
/ca.crt
/ca.pem
# Cache
/.cache

0
mcp_tests/__init__.py Normal file
View File

32
mcp_tests/base_test.py Normal file
View File

@ -0,0 +1,32 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mcp_tests.helpers.containers import ContainerEngine
from mcp_tests.helpers.ssh_manager import SSHManager
class TestBasic(object):
"""Basic test case class for tests.
"""
def __init__(self):
self._devops_config = None
@property
def ssh_manager(self):
return SSHManager()
@property
def container_engine(self):
return ContainerEngine()

View File

View File

@ -0,0 +1,147 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import division
from mcp_tests.logger import logger
class ContainerEngine(object):
def __init__(self,
remote=None,
image_name=None,
container_repo=None,
proxy_url=None,
user_id=0,
container_name=None,
dir_for_home='/var/home',
):
self.remote = remote
self.container_repo = container_repo
self.repository_tag = 'latest'
self.proxy_url = proxy_url or ""
self.user_id = user_id
self.image_name = image_name
self.container_name = container_name
self.dir_for_home = dir_for_home
self.home_bind_path = '{0}/{1}'.format(
self.dir_for_home, self.container_name)
self.setup()
def image_exists(self, tag='latest'):
cmd = "docker images | grep {0}| awk '{{print $1}}'".format(
self.image_name)
logger.info('Checking Docker images...')
result = self.remote.execute(cmd)
logger.debug(result)
existing_images = [line.strip().split() for line in result['stdout']]
return [self.container_repo, tag] in existing_images
def pull_image(self):
# TODO: add possibility to load image from local path or
# remote link provided in settings, in order to speed up downloading
cmd = 'docker pull {0}'.format(self.container_repo)
logger.debug('Downloading Rally repository/image from registry...')
result = self.remote.execute(cmd)
logger.debug(result)
return self.image_exists()
def run_container_command(self, command, in_background=False):
command = str(command).replace(r"'", r"'\''")
options = ''
if in_background:
options = '{0} -d'.format(options)
cmd = ("docker run {options} --user {user_id} --net=\"host\" -e "
"\"http_proxy={proxy_url}\" -e \"https_proxy={proxy_url}\" "
"-v {dir_for_home}:{home_bind_path} {container_repo}:{tag} "
"/bin/bash -c '{command}'".format(
options=options,
user_id=self.user_id,
proxy_url=self.proxy_url,
dir_for_home=self.dir_for_home,
home_bind_path=self.home_bind_path,
container_repo=self.container_repo,
tag=self.repository_tag,
command=command))
logger.debug('Executing command "{0}" in Rally container {1}..'.format(
cmd, self.container_repo))
result = self.remote.execute(cmd)
logger.debug(result)
return result
def setup_utils(self):
utils = ['gawk', 'vim', 'curl']
cmd = ('unset http_proxy https_proxy; apt-get update; '
'apt-get install -y {0}'.format(' '.join(utils)))
logger.debug('Installing utils "{0}" to the container...'.format(
utils))
result = self.run_container_command(cmd)
assert(result['exit_code'] == 0,
'Utils installation failed in container: '
'{0}'.format(result))
def prepare_image(self):
self.setup_utils()
last_container_cmd = "docker ps -lq"
result = self.remote.execute(last_container_cmd)
assert(result['exit_code'] == 0,
"Unable to get last container ID: {0}!".format(result))
last_container = ''.join([line.strip() for line in result['stdout']])
commit_cmd = 'docker commit {0} {1}:ready'.format(last_container,
self.container_repo)
result = self.remote.execute(commit_cmd)
assert(result['exit_code'] == 0,
'Commit to Docker image "{0}" failed: {1}.'.format(
self.container_repo, result))
return self.image_exists(tag='ready')
def setup_bash_alias(self):
alias_name = '{}_docker'.format(self.image_name)
check_alias_cmd = '. /root/.bashrc && alias {0}'.format(alias_name)
result = self.remote.execute(check_alias_cmd)
if result['exit_code'] == 0:
return
logger.debug('Creating bash alias for {} inside container...'.format(
self.image_name))
create_alias_cmd = ("alias {alias_name}='docker run --user {user_id} "
"--net=\"host\" -e \"http_proxy={proxy_url}\" -t "
"-i -v {dir_for_home}:{home_bind_path} "
"{container_repo}:{tag} {image_name}'".format(
alias_name=alias_name,
user_id=self.user_id,
proxy_url=self.proxy_url,
dir_for_home=self.dir_for_home,
home_bind_path=self.home_bind_path,
container_repo=self.container_repo,
tag=self.repository_tag,
image_name=self.image_name))
result = self.remote.execute('echo "{0}">> /root/.bashrc'.format(
create_alias_cmd))
assert (result['exit_code'] == 0,
"Alias creation for running {0} from container failed: "
"{1}.".format(self.image_name, result))
result = self.remote.execute(check_alias_cmd)
assert(result['exit_code'] == 0,
"Alias creation for running {} from container failed: "
"{1}.".format(self.image_name, result))
def setup(self):
if not self.image_exists():
assert (self.pull_image(),
"Docker image for {} not found!".format(self.image_name))
if not self.image_exists(tag='ready'):
assert(self.prepare_image(),
"Docker image for {} is not ready!".format(self.image_name))
self.repository_tag = 'ready'
self.setup_bash_alias()

View File

@ -0,0 +1,52 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class UnexpectedExitCode(Exception):
def __init__(self, command, ec, expected_ec, stdout=None, stderr=None):
"""Exception for unexpected exit code after executing shell/ssh command
:param command: str - executed command
:param ec: int - actual exit code
:param expected_ec: list of integers - expected exit codes
:param stdout: str
:param stderr: str
"""
self.ec = ec
self.expected_ec = expected_ec
self.cmd = command
self.stdout = stdout
self.stderr = stderr
super(UnexpectedExitCode, self).__init__()
def __str__(self):
message = "Command '{cmd:s}' returned unexpected exit code {code:d}," \
" while waiting for {exp}".format(cmd=self.cmd,
code=self.ec,
exp=self.expected_ec)
if self.stdout:
message += "stdout: {}\n".format(self.stdout)
if self.stderr:
message += "stderr: {}\n".format(self.stderr)
return message
class VariableNotSet(Exception):
def __init__(self, variable_name, expected_value):
self.variable_name = variable_name
self.expected_value = expected_value
super(VariableNotSet, self).__init__()
def __str__(self):
return "Variable {0} was not set in value {1}".format(
self.variable_name, self.expected_value)

View File

@ -0,0 +1,27 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class SingletonMeta(type):
"""Metaclass for Singleton
Main goals: not need to implement __new__ in singleton classes
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(
SingletonMeta, cls).__call__(*args, **kwargs)
return cls._instances[cls]

View File

@ -0,0 +1,341 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import posixpath
import re
import traceback
from devops.helpers.helpers import wait
from devops.models.node import SSHClient
from paramiko import RSAKey
import six
from mcp_tests.logger import logger
from mcp_tests.helpers.metaclasses import SingletonMeta
from mcp_tests.helpers import mcp_tests_exceptions
from mcp_tests.settings import SSH_NODE_CREDENTIALS
@six.add_metaclass(SingletonMeta)
class SSHManager(object):
def __init__(self):
logger.debug('SSH_MANAGER: Run constructor SSHManager')
self.__connections = {} # Disallow direct type change and deletion
self.ip = None
self.port = None
self.login = None
self.password = None
@property
def connections(self):
return self.__connections
def initialize(self, ip,
login=SSH_NODE_CREDENTIALS['login'],
password=SSH_NODE_CREDENTIALS['password']):
""" It will be moved to __init__
:param ip: ip address of node
:param login: user name
:param password: password for user
:return: None
"""
self.ip = ip
self.port = 22
self.login = login
self.password = password
@staticmethod
def _connect(remote):
""" Check if connection is stable and return this one
:param remote:
:return:
"""
try:
wait(lambda: remote.execute("cd ~")['exit_code'] == 0, timeout=20)
except Exception:
logger.info('SSHManager: Check for current '
'connection fails. Try to reconnect')
logger.debug(traceback.format_exc())
remote.reconnect()
return remote
def _get_keys(self):
keys = []
remote = self.get_remote(self.ip)
key_string = '/root/.ssh/id_rsa'
with remote.open(key_string) as f:
keys.append(RSAKey.from_private_key(f))
return keys
def get_remote(self, ip, port=22):
""" Function returns remote SSH connection to node by ip address
:param ip: IP of host
:param port: port for SSH
:return: SSHClient
"""
if (ip, port) not in self.connections:
logger.debug('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
keys = self._get_keys()
ip = self.ip
username = self.login
password = self.password
ssh_client = SSHClient(
host=ip,
port=port,
username=username,
password=password,
private_keys=keys
)
ssh_client.sudo_mode = True
self.connections[(ip, port)] = ssh_client
logger.debug('SSH_MANAGER:Return existed connection for '
'{ip}:{port}'.format(ip=ip, port=port))
logger.debug('SSH_MANAGER: Connections {0}'.format(self.connections))
return self._connect(self.connections[(ip, port)])
def update_connection(self, ip, login=None, password=None,
keys=None, port=22):
"""Update existed connection
:param ip: host ip string
:param login: login string
:param password: password string
:param keys: list of keys
:param port: ssh port int
:return: None
"""
if (ip, port) in self.connections:
logger.info('SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
self.connections[(ip, port)].clear()
logger.info('SSH_MANAGER:Create new connection for '
'{ip}:{port}'.format(ip=ip, port=port))
self.connections[(ip, port)] = SSHClient(
host=ip,
port=port,
username=login,
password=password,
private_keys=keys if keys is not None else []
)
def clean_all_connections(self):
for (ip, port), connection in self.connections.items():
connection.clear()
logger.info('SSH_MANAGER:Close connection for {ip}:{port}'.format(
ip=ip, port=port))
def execute(self, ip, cmd, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.execute(cmd)
def check_call(self, ip, cmd, port=22, verbose=False):
remote = self.get_remote(ip=ip, port=port)
return remote.check_call(cmd, verbose)
def execute_on_remote(self, ip, cmd, port=22, err_msg=None,
jsonify=False, assert_ec_equal=None,
raise_on_assert=True):
"""Execute ``cmd`` on ``remote`` and return result.
:param ip: ip of host
:param port: ssh port
:param cmd: command to execute on remote host
:param err_msg: custom error message
:param assert_ec_equal: list of expected exit_code
:param raise_on_assert: Boolean
:return: dict
:raise: Exception
"""
if assert_ec_equal is None:
assert_ec_equal = [0]
result = self.execute(ip=ip, port=port, cmd=cmd)
result['stdout_str'] = ''.join(result['stdout']).strip()
result['stdout_len'] = len(result['stdout'])
result['stderr_str'] = ''.join(result['stderr']).strip()
result['stderr_len'] = len(result['stderr'])
details_log = (
"Host: {host}\n"
"Command: '{cmd}'\n"
"Exit code: {code}\n"
"STDOUT:\n{stdout}\n"
"STDERR:\n{stderr}".format(
host=ip, cmd=cmd, code=result['exit_code'],
stdout=result['stdout_str'], stderr=result['stderr_str']
))
if result['exit_code'] not in assert_ec_equal:
error_msg = (
err_msg or
"Unexpected exit_code returned: actual {0}, expected {1}."
"".format(
result['exit_code'],
' '.join(map(str, assert_ec_equal))))
log_msg = (
"{0} Command: '{1}' "
"Details:\n{2}".format(
error_msg, cmd, details_log))
logger.error(log_msg)
if raise_on_assert:
raise mcp_tests_exceptions.UnexpectedExitCode(
cmd,
result['exit_code'],
assert_ec_equal,
stdout=result['stdout_str'],
stderr=result['stderr_str'])
else:
logger.debug(details_log)
if jsonify:
try:
result['stdout_json'] = \
self._json_deserialize(result['stdout_str'])
except Exception:
error_msg = (
"Unable to deserialize output of command"
" '{0}' on host {1}".format(cmd, ip))
logger.error(error_msg)
raise Exception(error_msg)
return result
def execute_async_on_remote(self, ip, cmd, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.execute_async(cmd)
@staticmethod
def _json_deserialize(json_string):
""" Deserialize json_string and return object
:param json_string: string or list with json
:return: obj
:raise: Exception
"""
if isinstance(json_string, list):
json_string = ''.join(json_string)
try:
obj = json.loads(json_string)
except Exception:
log_msg = "Unable to deserialize"
logger.error("{0}. Actual string:\n{1}".format(log_msg,
json_string))
raise Exception(log_msg)
return obj
def open_on_remote(self, ip, path, mode='r', port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.open(path, mode)
def upload_to_remote(self, ip, source, target, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.upload(source, target)
def download_from_remote(self, ip, destination, target, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.download(destination, target)
def exists_on_remote(self, ip, path, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.exists(path)
def isdir_on_remote(self, ip, path, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.isdir(path)
def isfile_on_remote(self, ip, path, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.isfile(path)
def mkdir_on_remote(self, ip, path, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.mkdir(path)
def rm_rf_on_remote(self, ip, path, port=22):
remote = self.get_remote(ip=ip, port=port)
return remote.rm_rf(path)
def cond_upload(self, ip, source, target, port=22, condition='',
clean_target=False):
""" Upload files only if condition in regexp matches filenames
:param ip: host ip
:param source: source path
:param target: destination path
:param port: ssh port
:param condition: regexp condition
:return: count of files
"""
# remote = self.get_remote(ip=ip, port=port)
# maybe we should use SSHClient function. e.g. remote.isdir(target)
# we can move this function to some *_actions class
if self.isdir_on_remote(ip=ip, port=port, path=target):
target = posixpath.join(target, os.path.basename(source))
if clean_target:
self.rm_rf_on_remote(ip=ip, port=port, path=target)
self.mkdir_on_remote(ip=ip, port=port, path=target)
source = os.path.expanduser(source)
if not os.path.isdir(source):
if re.match(condition, source):
self.upload_to_remote(ip=ip, port=port,
source=source, target=target)
logger.debug("File '{0}' uploaded to the remote folder"
" '{1}'".format(source, target))
return 1
else:
logger.debug("Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition, source))
return 0
files_count = 0
for rootdir, _, files in os.walk(source):
targetdir = os.path.normpath(
os.path.join(
target,
os.path.relpath(rootdir, source))).replace("\\", "/")
self.mkdir_on_remote(ip=ip, port=port, path=targetdir)
for entry in files:
local_path = os.path.join(rootdir, entry)
remote_path = posixpath.join(targetdir, entry)
if re.match(condition, local_path):
self.upload_to_remote(ip=ip,
port=port,
source=local_path,
target=remote_path)
files_count += 1
logger.debug("File '{0}' uploaded to the "
"remote folder '{1}'".format(source, target))
else:
logger.debug("Pattern '{0}' doesn't match the file '{1}', "
"uploading skipped".format(condition,
local_path))
return files_count

113
mcp_tests/helpers/utils.py Normal file
View File

@ -0,0 +1,113 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
import yaml
import traceback
from mcp_tests.logger import logger
from mcp_tests import settings
def get_test_method_name():
raise NotImplementedError
def update_yaml(yaml_tree=None, yaml_value='', is_uniq=True,
yaml_file=settings.TIMESTAT_PATH_YAML):
"""Store/update a variable in YAML file.
yaml_tree - path to the variable in YAML file, will be created if absent,
yaml_value - value of the variable, will be overwritten if exists,
is_uniq - If false, add the unique two-digit suffix to the variable name.
"""
if yaml_tree is None:
yaml_tree = []
yaml_data = {}
if os.path.isfile(yaml_file):
with open(yaml_file, 'r') as f:
yaml_data = yaml.load(f)
# Walk through the 'yaml_data' dict, find or create a tree using
# sub-keys in order provided in 'yaml_tree' list
item = yaml_data
for n in yaml_tree[:-1]:
if n not in item:
item[n] = {}
item = item[n]
if is_uniq:
last = yaml_tree[-1]
else:
# Create an uniq suffix in range '_00' to '_99'
for n in range(100):
last = str(yaml_tree[-1]) + '_' + str(n).zfill(2)
if last not in item:
break
item[last] = yaml_value
with open(yaml_file, 'w') as f:
yaml.dump(yaml_data, f, default_flow_style=False)
class TimeStat(object):
""" Context manager for measuring the execution time of the code.
Usage:
with TimeStat([name],[is_uniq=True]):
"""
def __init__(self, name=None, is_uniq=False):
if name:
self.name = name
else:
self.name = 'timestat'
self.is_uniq = is_uniq
self.begin_time = 0
self.end_time = 0
self.total_time = 0
def __enter__(self):
self.begin_time = time.time()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.end_time = time.time()
self.total_time = self.end_time - self.begin_time
# Create a path where the 'self.total_time' will be stored.
yaml_path = []
# There will be a list of one or two yaml subkeys:
# - first key name is the method name of the test
method_name = get_test_method_name()
if method_name:
yaml_path.append(method_name)
# - second (subkey) name is provided from the decorator (the name of
# the just executed function), or manually.
yaml_path.append(self.name)
try:
update_yaml(yaml_path, '{:.2f}'.format(self.total_time),
self.is_uniq)
except Exception:
logger.error("Error storing time statistic for {0}"
" {1}".format(yaml_path, traceback.format_exc()))
raise
@property
def spent_time(self):
return time.time() - self.begin_time

73
mcp_tests/logger.py Normal file
View File

@ -0,0 +1,73 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import traceback
import os
from mcp_tests.settings import LOGS_DIR
if not os.path.exists(LOGS_DIR):
os.makedirs(LOGS_DIR)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s %(filename)s:'
'%(lineno)d -- %(message)s',
filename=os.path.join(LOGS_DIR, 'tests.log'),
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s %(filename)s:'
'%(lineno)d -- %(message)s')
console.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(console)
# suppress iso8601 and paramiko debug logging
class NoDebugMessageFilter(logging.Filter):
def filter(self, record):
return not record.levelno <= logging.DEBUG
logging.getLogger('paramiko.transport').addFilter(NoDebugMessageFilter())
logging.getLogger('paramiko.hostkeys').addFilter(NoDebugMessageFilter())
logging.getLogger('iso8601.iso8601').addFilter(NoDebugMessageFilter())
def debug(logger):
def wrapper(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
logger.debug(
"Calling: {} with args: {} {}".format(
func.__name__, args, kwargs
)
)
try:
result = func(*args, **kwargs)
logger.debug(
"Done: {} with result: {}".format(func.__name__, result))
except BaseException as e:
logger.error(
'{func} raised: {exc!r}\n'
'Traceback: {tb!s}'.format(
func=func.__name__, exc=e, tb=traceback.format_exc()))
raise
return result
return wrapped
return wrapper
logwrap = debug(logger)

View File

View File

@ -0,0 +1,25 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mcp_tests.base_test import TestBasic
class Manager(TestBasic):
"""Manager class for tests."""
def __init__(self, config_file, cls):
super(Manager, self).__init__()
self._devops_config = None
self._start_time = 0
self._context = cls

View File

@ -0,0 +1,6 @@
git+git://github.com/openstack/fuel-devops.git@2.9.20
paramiko
six
requests>=2.2.0
pytest>=2.9
docker-py

View File

View File

@ -0,0 +1,44 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
import traceback
from mcp_tests.helpers import mcp_tests_exceptions
from mcp_tests.logger import logger
from mcp_tests import settings
class TestMysqlImage(object):
"""Test class consits simple tests for mysql container"""
@pytest.mark.mysql_base
def test_mysql_is_running(self):
"""Start container from image, check if mysql is running
Scenario:
1. Get image from private registry
2. Start container with it
3. Check if mysql is running
4. Destroy container
"""
logger.info('Check if registry set {0}'.format(
settings.PRIVATE_REGISTRY))
try:
if not settings.PRIVATE_REGISTRY:
raise mcp_tests_exceptions.VariableNotSet(
settings.PRIVATE_REGISTRY, 'localhost:5002/registry')
except mcp_tests_exceptions.VariableNotSet:
logger.error(traceback.format_exc())

34
mcp_tests/settings.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
def get_var_as_bool(name, default):
value = os.environ.get(name, '')
return _boolean_states.get(value.lower(), default)
LOGS_DIR = os.environ.get('LOGS_DIR', os.getcwd())
TIMESTAT_PATH_YAML = os.environ.get(
'TIMESTAT_PATH_YAML', os.path.join(
LOGS_DIR, 'timestat_{}.yaml'.format(time.strftime("%Y%m%d"))))
SSH_NODE_CREDENTIALS = os.environ.get('SSH_NODE_CREDENTIALS',
{'login': 'test', 'password': 'test'})
PRIVATE_REGISTRY = os.environ.get('PRIVATE_REGISTRY')

View File

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
addopts = -vvv -s -p no:django -p no:ipdb --junit-xml=nosetests.xml
testpaths = mcp_tests

28
tox.ini Normal file
View File

@ -0,0 +1,28 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
skipsdist = True
envlist = pep8, py27eters = True
[testenv]
deps = -r{toxinidir}/mcp_tests/requirements.txt
usedevelop = False
commands = py.test
[testenv:pep8]
deps = flake8
usedevelop = False
exclude = .venv,.git,.tox,.chache,.lib,dist,doc,*egg,build,local*
commands =
flake8 {posargs:.}
[flake8]
ignore = H302,H802
exclude = .venv,.git,.tox,dist,doc,*egg,build,local,./lib
show-pep8 = True
show-source = True
count = True