Switch to fixture using in tests

My proposal is using fixtures for our tests. So, common actions are in
  mcp_tests/system_tests/base_test.py, fixtures described in
  mcp_tests/system_tests/*_fixtures.py, and test looks like
  mcp_tests/system_tests/test_ccp_install_k8s.py

  Updated command for test:

  ENV_NAME="<env_name>" IMAGE_PATH="<path_to_qcow2_image>" WORKSPACE=/tmp DEPLOY_SCRIPT="/path/to/kargo_deploy.sh" SUSPEND_ENV_ON_TEARDOWN=false py.test -vvv -s mcp_tests/system_tests/test_ccp_install_k8s.py

Change-Id: I3f18a0463e6e0b2a2c20a83ef6c1a28f09beaeed
This commit is contained in:
Sergey Lebedev 2016-07-19 13:18:37 +03:00
parent 01f2a6cdd8
commit 11f19d01f4
15 changed files with 659 additions and 330 deletions

View File

@ -36,7 +36,8 @@ class EnvironmentManager(manager.Manager):
:param node_image: path to node image string
"""
super(EnvironmentManager, self).__init__(*args, **kwargs)
self.config_file = config_file
if config_file is not None:
self.devops_config.load_template(config_file)
self.env_name = env_name
self.master_image = master_image
self.node_image = node_image
@ -81,7 +82,7 @@ class EnvironmentManager(manager.Manager):
"""
self._env = models.Environment.get(name=name)
def create_snapshot(self, name):
def create_snapshot(self, name, description=None):
"""Create named snapshot of current env.
:name: string
@ -89,7 +90,7 @@ class EnvironmentManager(manager.Manager):
LOG.info("Creating snapshot named '{0}'".format(name))
if self._env is not None:
self._env.suspend()
self._env.snapshot(name, force=True)
self._env.snapshot(name, description=description, force=True)
self._env.resume()
else:
raise exc.EnvironmentIsNotSet()
@ -113,10 +114,7 @@ class EnvironmentManager(manager.Manager):
otherwise we tries to generate config from self.config_file,
"""
if self.devops_config.config is None:
LOG.debug('Seems config for fuel-devops is not set.')
if self.config_file is None:
raise exc.DevopsConfigPathIsNotSet()
self.devops_config.load_template(self.config_file)
raise exc.DevopsConfigPathIsNotSet()
self.merge_config_params()
settings = self.devops_config
env_name = settings['env_name']
@ -148,8 +146,6 @@ class EnvironmentManager(manager.Manager):
if self._env is None:
raise exc.EnvironmentIsNotSet()
self._env.start()
def wait_ssh_k8s_nodes(self):
for node in self.k8s_nodes:
LOG.debug("Waiting for SSH on node '{}...'".format(node.name))
timeout = 360

View File

@ -42,6 +42,7 @@ IMAGE_PATH = os.environ.get('IMAGE_PATH', None)
CONF_PATH = os.environ.get('CONF_PATH', os.path.abspath(_default_conf))
SUSPEND_ENV_ON_TEARDOWN = os.environ.get('SUSPEND_ENV_ON_TEARDOWN', True)
DEPLOY_SCRIPT = os.environ.get("DEPLOY_SCRIPT", None)
USE_CUSTOM_YAML = os.environ.get("USE_CUSTOM_YAML", True)
PRIVATE_REGISTRY = os.environ.get('PRIVATE_REGISTRY', None)

View File

@ -1,49 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import pytest
from mcp_tests.helpers import mcp_tests_exceptions as exc
from mcp_tests import logger
from mcp_tests import settings
logging.getLogger('EnvironmentManager').addHandler(logger.console)
LOG = logger.logger
class SystemBaseTest(object):
"""SystemBaseTest contains setup/teardown for environment creation"""
@classmethod
def setup_class(cls):
"""Create Environment or use an existing one"""
LOG.info('Trying to get existing environment')
try:
cls.env.get_env_by_name(name=settings.ENV_NAME)
except exc.EnvironmentDoesNotExist:
LOG.info("Environment doesn't exist, creating new one")
cls.env.create_environment()
LOG.info("Environment created")
@pytest.mark.skipif(not settings.SUSPEND_ENV_ON_TEARDOWN,
reason="Suspend isn't needed"
)
@classmethod
def teardown_class(cls):
"""Suspend environment"""
LOG.info("Suspending environment")
cls.env.suspend()

View File

@ -0,0 +1,158 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import subprocess
import os
import pytest
import yaml
from mcp_tests import logger
from mcp_tests import settings
LOG = logger.logger
LOG.addHandler(logger.console)
class SystemBaseTest(object):
"""SystemBaseTest contains setup/teardown for environment creation"""
kube_settings = {}
base_images = [
"quay.io/coreos/etcd",
"andyshinn/dnsmasq",
"quay.io/coreos/hyperkube"
]
@property
def kube_network_plugin(self):
return self.kube_settings.get('kube_network_plugin', None)
def exec_on_node(self, env, node, cmd, expected_exit_code=0):
"""Function to exec command on node and get result
:param env: mcp_tests.managers.envmanager.EnvironmentManager
:param node: devops.models.Node
:param cmd: string
:rtype: dict
"""
remote = env.node_ssh_client(
node,
**settings.SSH_NODE_CREDENTIALS
)
with remote.get_sudo(remote):
result = remote.execute(
command=cmd,
verbose=True
)
assert result['exit_code'] == expected_exit_code
return result
def check_requirement_settings(self, env, use_custom_yaml):
"""Check requirement settings"""
for node in env.k8s_nodes:
if use_custom_yaml and self.kube_network_plugin == 'calico':
self.calico_ipip_exists(node, env)
else:
self.calico_ipip_exists(node, env, exists=False)
def calico_ipip_exists(self, node, env, exists=True):
"""Check if ipip is in calico pool config
:param node: devops.models.Node
:param env: mcp_tests.managers.envmanager.EnvironmentManager
:param exists: Bool
"""
def expected(x):
if x:
return 0
else:
return 1
cmd = "calicoctl pool show | grep ipip"
self.exec_on_node(env, node, cmd, expected_exit_code=expected(exists))
def running_containers(self, node, env, use_custom_yaml):
"""Check if there are all base containers on node
:param node: devops.models.Node
:param env: mcp_tests.managers.envmanager.EnvironmentManager
:param use_custom_yaml: Bool
"""
cmd = "docker ps --no-trunc --format '{{.Image}}'"
expected_list = copy.deepcopy(self.base_images)
if self.kube_network_plugin == 'calico' and use_custom_yaml:
expected_list.append('calico/node')
result = self.exec_on_node(env, node, cmd)
images = [x.split(":")[0] for x in result['stdout']]
assert set(expected_list) < set(images)
def check_running_containers(self, env, use_custom_yaml):
"""Check running containers on each node
:param env: mcp_tests.managers.envmanager.EnvironmentManager
"""
for node in env.k8s_nodes:
self.running_containers(node, env, use_custom_yaml)
def check_number_kube_nodes(self, env, k8sclient):
"""Check number of slaves"""
LOG.info("Check number of nodes")
k8s_nodes = k8sclient.nodes.list()
devops_nodes = env.k8s_nodes
assert len(k8s_nodes) == len(devops_nodes)
def ccp_install_k8s(self, env, use_custom_yaml=False):
"""Action to deploy k8s by fuel-ccp-installer script
:param env: mcp_tests.managers.envmanager.EnvironmentManager
:param use_custom_yaml: Bool
"""
current_env = copy.deepcopy(os.environ)
environment_variables = {
"SLAVE_IPS": " ".join(env.k8s_ips),
"ADMIN_IP": env.k8s_ips[0],
}
if use_custom_yaml:
environment_variables.update(
{"CUSTOM_YAML": yaml.dump(
self.kube_settings, default_flow_style=False)}
)
current_env.update(dict=environment_variables)
self.deploy_k8s(environ=current_env)
def deploy_k8s(self, environ=os.environ):
"""Base action to deploy k8s by external deployment script"""
try:
process = subprocess.Popen([settings.DEPLOY_SCRIPT],
env=environ,
shell=True,
bufsize=0,
)
assert process.wait() == 0
except (SystemExit, KeyboardInterrupt) as err:
process.terminate()
raise err
def create_env_snapshot(self, name, env, description=None):
env.create_snapshot(name, description=description)
@pytest.mark.skipif(not settings.SUSPEND_ENV_ON_TEARDOWN,
reason="Suspend isn't needed"
)
@classmethod
def teardown_class(cls, env):
"""Suspend environment"""
LOG.info("Suspending environment")
env.suspend()

View File

@ -0,0 +1,42 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
from mcp_tests import settings
@pytest.fixture
def use_custom_yaml(request):
"""Fixture to get USE_CUSTOM_YAML setting and provide its value"""
use_custom_yaml = settings.USE_CUSTOM_YAML
return use_custom_yaml
@pytest.fixture(scope='function')
def k8s_installed(request, env, use_custom_yaml):
"""Fixture to prepare needed state and revert from snapshot if it's needed
:param request: pytest.python.FixtureRequest
:param env: envmanager.EnvironmentManager
:param use_custom_yaml: Bool
"""
ACTION = "ccp_install_k8s"
install_action = getattr(request.instance, ACTION, None)
if install_action is None:
pytest.fail(msg="Test instance hasn't attribute '{0}'".format(
ACTION
))
else:
install_action(env,
use_custom_yaml=use_custom_yaml)

View File

@ -0,0 +1,56 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import division
import time
import pytest
from mcp_tests import logger
LOG = logger.logger
pytest_plugins = ['k8s_fixtures', 'env_fixtures', 'ccp_installer_fixtures']
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
rep = outcome.get_result()
setattr(item, "rep_" + rep.when, rep)
def pytest_runtest_setup(item):
item.cls._current_test = item.function
item._start_time = time.time()
head = "<" * 5 + "#" * 30 + "[ {} ]" + "#" * 30 + ">" * 5
head = head.format(item.function.__name__)
start_step = "\n{head}".format(head=head)
LOG.info(start_step)
def pytest_runtest_teardown(item):
step_name = item.function.__name__
if hasattr(item, '_start_time'):
spent_time = time.time() - item._start_time
else:
spent_time = 0
minutes = spent_time // 60
seconds = int(round(spent_time)) % 60
finish_step = "FINISH {} TEST. TOOK {} min {} sec".format(
step_name, minutes, seconds
)
foot = "\n" + "<" * 5 + "#" * 30 + "[ {} ]" + "#" * 30 + ">" * 5
foot = foot.format(finish_step)
LOG.info(foot)

View File

@ -0,0 +1,158 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from devops import error
import pytest
from mcp_tests import logger
from mcp_tests import settings
from mcp_tests.managers import envmanager
LOG = logger.logger
INITIAL_SNAPSHOT_SUFFIX = "initial"
def extract_name_from_mark(mark):
"""Simple function to extract name from mark
:param mark: pytest.mark.MarkInfo
:rtype: string or None
"""
if mark:
if len(mark.args) > 0:
return mark.args[0]
elif 'name' in mark.kwargs:
return mark.kwargs['name']
return None
def make_snapshot_name(env, suffix, default):
"""Creating snapshot name
:param request: pytest.python.FixtureRequest
:param suffix: string
:param default: string or None
:rtype: string or None
"""
if env:
if suffix:
return "{0}_{1}".format(
env.d_env_name,
suffix
)
else:
return "{0}_{1}".format(
env.d_env_name,
default
)
return default
@pytest.fixture(scope='function', autouse=True)
def revert_snapshot(request, env):
"""Fixture to revert environment to snapshot
Marks:
revert_snapshot - if used this mark with 'name' parameter,
use given name as result
:param request: pytest.python.FixtureRequest
:param env: envmanager.EnvironmentManager
"""
revert_snapshot = request.keywords.get('revert_snapshot', None)
snapshot_name = make_snapshot_name(
env, extract_name_from_mark(revert_snapshot), INITIAL_SNAPSHOT_SUFFIX)
if revert_snapshot and snapshot_name:
if env.has_snapshot(snapshot_name):
LOG.info("Reverting snapshot {0}".format(snapshot_name))
env.revert_snapshot(snapshot_name)
else:
pytest.fail("Environment doesn't have snapshot named '{}'".format(
snapshot_name))
@pytest.fixture(scope="session")
def env():
"""Fixture to get EnvironmentManager instance for session
:rtype: envmanager.EnvironmentManager
"""
result = envmanager.EnvironmentManager(config_file=settings.CONF_PATH)
try:
result.get_env_by_name(result.d_env_name)
except error.DevopsObjNotFound:
LOG.info("Environment doesn't exist, creating a new one")
result.create_environment()
result.create_snapshot(make_snapshot_name(result,
INITIAL_SNAPSHOT_SUFFIX,
None))
LOG.info("Environment created")
return result
@pytest.fixture(scope='function', autouse=True)
def snapshot(request, env):
"""Fixture for creating snapshot at the end of test if it's needed
Marks:
snapshot_needed(name=None) - make snapshot if test is passed. If
name argument provided, it will be used for creating snapshot,
otherwise, test function name will be used
fail_snapshot - make snapshot if test failed
:param request: pytest.python.FixtureRequest
:param env: envmanager.EnvironmentManager
"""
snapshot_needed = request.keywords.get('snapshot_needed', None)
fail_snapshot = request.keywords.get('fail_snapshot', None)
def test_fin():
default_snapshot_name = getattr(request.node.function,
'_snapshot_name',
request.node.function.__name__)
if hasattr(request.node, 'rep_call') and request.node.rep_call.passed:
if snapshot_needed:
snapshot_name = make_snapshot_name(
env, extract_name_from_mark(snapshot_needed),
default_snapshot_name +
"_passed"
)
request.instance.create_env_snapshot(
name=snapshot_name, env=env
)
elif (hasattr(request.node, 'rep_setup') and
request.node.rep_setup.failed):
if fail_snapshot:
suffix = "{0}_prep_failed".format(
default_snapshot_name
)
request.instance.create_env_snapshot(
name=make_snapshot_name(
env, suffix, None
), env=env
)
elif (hasattr(request.node, 'rep_call') and
request.node.rep_call.failed):
if fail_snapshot:
suffix = "{0}_failed".format(
default_snapshot_name
)
snapshot_name = make_snapshot_name(
env, suffix, None
)
request.instance.create_env_snapshot(
name=snapshot_name, env=env
)
request.addfinalizer(test_fin)

View File

@ -0,0 +1,31 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
from mcp_tests import settings
from mcp_tests.models.k8s import cluster
@pytest.fixture(scope='session')
def k8sclient(env):
"""Fixture to get K8sCluster instance for session
:param env: envmanager.EnvironmentManager
:rtype: cluster.K8sCluster
"""
admin_ip = env.node_ip(env.k8s_nodes[0])
k8s = cluster.K8sCluster(user=settings.KUBE_ADMIN_USER,
password=settings.KUBE_ADMIN_PASS,
host=admin_ip)
return k8s

View File

@ -0,0 +1,70 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
from mcp_tests import logger
from mcp_tests import settings
import base_test
LOG = logger.logger
class TestFuelCCPInstaller(base_test.SystemBaseTest):
"""Test class for testing k8s deployed by fuel-ccp-installer"""
kube_settings = {
"kube_network_plugin": "calico",
"kube_proxy_mode": "iptables",
"hyperkube_image_repo": "quay.io/coreos/hyperkube",
"hyperkube_image_tag": "{0}_coreos.0".format(settings.KUBE_VERSION),
"etcd_deployment_type": "host",
"kube_version": settings.KUBE_VERSION,
"cloud_provider": "generic"
}
@pytest.mark.snapshot_needed
@pytest.mark.revert_snapshot
@pytest.mark.fail_snapshot
def test_k8s_installed_default(self, env, k8sclient,
use_custom_yaml=False):
"""Test for deploying an k8s environment and check it
Scenario:
1. Install k8s with default parameters.
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
"""
self.ccp_install_k8s(env, use_custom_yaml=use_custom_yaml)
self.check_number_kube_nodes(env, k8sclient)
self.check_running_containers(env, use_custom_yaml=use_custom_yaml)
self.check_requirement_settings(env, use_custom_yaml=use_custom_yaml)
@pytest.mark.snapshot_needed
@pytest.mark.revert_snapshot
@pytest.mark.fail_snapshot
def test_k8s_installed_with_custom_yaml(self, env, k8sclient,
use_custom_yaml=True):
"""Test for deploying an k8s environment and check it
Scenario:
1. Install k8s with custom yaml.
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
"""
self.ccp_install_k8s(env, use_custom_yaml=use_custom_yaml)
self.check_number_kube_nodes(env, k8sclient)
self.check_running_containers(env, use_custom_yaml=use_custom_yaml)
self.check_requirement_settings(env, use_custom_yaml=use_custom_yaml)

View File

@ -1,107 +0,0 @@
import copy
import os
import subprocess
import pytest
from devops import error
from mcp_tests.managers import envmanager
from mcp_tests import logger
from mcp_tests import settings
LOG = logger.logger
class TestCreateEnv(object):
"""Create VMs for mcpinstaller"""
env = envmanager.EnvironmentManager(settings.CONF_PATH)
empty_snapshot = "empty"
upgraded_snapshot = "upgraded"
deployed_snapshot = "kargo_deployed"
@classmethod
def setup_class(cls):
LOG.info("Creating environment")
try:
cls.env.get_env_by_name(name=settings.ENV_NAME)
except error.DevopsObjNotFound:
LOG.info("Environment doesn't exist, creating a new one")
cls.env.create_environment()
LOG.info("Environment created")
@pytest.mark.create_vms
def test_start_environment(self):
snapshot_name = self.empty_snapshot
LOG.info("Starting environment")
self.env.start_environment()
self.env.wait_ssh_k8s_nodes()
if not self.env.has_snapshot(snapshot_name):
self.env.create_snapshot(snapshot_name)
else:
self.env.revert_snapshot(snapshot_name)
@pytest.mark.create_vms
@pytest.mark.skipif(settings.DEPLOY_SCRIPT is None,
reason="Deploy script is not provided"
)
def test_deploy_kargo(self):
current_env = copy.deepcopy(os.environ)
kube_settings = [
"kube_network_plugin: \"calico\"",
"kube_proxy_mode: \"iptables\"",
"kube_version: \"{0}\"".format(settings.KUBE_VERSION),
]
environment_variables = {
"SLAVE_IPS": " ".join(self.env.k8s_ips),
"ADMIN_IP": self.env.k8s_ips[0],
"CUSTOM_YAML": "\n".join(kube_settings),
}
current_env.update(dict=environment_variables)
assert self.env.has_snapshot(self.empty_snapshot)
self.env.revert_snapshot(self.empty_snapshot)
try:
process = subprocess.Popen([settings.DEPLOY_SCRIPT],
env=current_env,
shell=True,
bufsize=0,
)
assert process.wait() == 0
self.env.create_snapshot(self.deployed_snapshot)
except (SystemExit, KeyboardInterrupt) as err:
process.terminate()
raise err
@pytest.mark.create_vms
@pytest.mark.skipif(settings.DEPLOY_SCRIPT is None,
reason="Deploy script is not provided"
)
def test_deploy_kargo_default(self):
snapshot_name = self.empty_snapshot
current_env = copy.deepcopy(os.environ)
environment_variables = {
"SLAVE_IPS": " ".join(self.env.k8s_ips),
"ADMIN_IP": self.env.k8s_ips[0],
"WORKSPACE": "/tmp",
}
current_env.update(dict=environment_variables)
assert self.env.has_snapshot(snapshot_name)
self.env.revert_snapshot(snapshot_name)
try:
process = subprocess.Popen([settings.DEPLOY_SCRIPT],
env=current_env,
shell=True,
bufsize=0,
)
assert process.wait() == 0
except (SystemExit, KeyboardInterrupt) as err:
process.terminate()
raise err
@pytest.mark.skipif(not settings.SUSPEND_ENV_ON_TEARDOWN,
reason="Suspend isn't needed"
)
@classmethod
def teardown_class(cls):
LOG.info("Suspending VMs")
cls.env.suspend()

View File

@ -1,130 +0,0 @@
import pytest
from mcp_tests import logger
from mcp_tests.managers import envmanager
from mcp_tests import settings
from mcp_tests import system_tests
LOG = logger.logger
@pytest.mark.skipif(settings.ENV_NAME is None,
reason="Skip of missed images")
class TestDeployedEnv(system_tests.SystemBaseTest):
"""Basis test case for testing an existing environment
Scenario:
1. Get an existing environment (from setup_class of parent class)
2. Resume VMs for testing
3. Determine master ips (if exists)
4. Determine slaves ips
5. Check if ssh to each node could be get
6. Compare number of slaves with k8s' nodes number
7. Check if all base containers exist on nodes
8. Suspend VMs.
"""
env = envmanager.EnvironmentManager(
config_file=settings.CONF_PATH)
base_images = [
"calico/node",
"andyshinn/dnsmasq",
"quay.io/smana/kubernetes-hyperkube"
]
def running_containers(self, node):
"""Check if there are all base containers on node
:param node: devops.models.Node
"""
remote = self.env.node_ssh_client(
node,
**settings.SSH_NODE_CREDENTIALS
)
cmd = "docker ps --no-trunc --format '{{.Image}}'"
with remote.get_sudo(remote):
result = remote.execute(
command=cmd,
verbose=True
)
assert result['exit_code'] == 0
images = [x.split(":")[0] for x in result['stdout']]
assert set(self.base_images) < set(images)
@pytest.mark.env_base
def test_resume_vms(self):
"""Resume Environment"""
LOG.info("Trying to resume environment")
self.env.resume()
self.env.start_environment()
@pytest.mark.xfail
@pytest.mark.env_base
def test_get_master_ips(self):
"""Trying to determine master nodes ips"""
LOG.info("Trying to get master ips")
ips = self.env.admin_ips
LOG.debug("Master IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.xfail
@pytest.mark.env_base
def test_get_slaves_ips(self):
"""Trying to determine slave nodes ips"""
LOG.info("Trying to get slave ips")
ips = self.env.slave_ips
LOG.debug("Slave IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.env_base
def test_get_k8s_ips(self):
LOG.info("Trying to get k8s ips")
ips = self.env.k8s_ips
LOG.debug("K8S IPs: {0}".format(ips))
assert ips is not None and len(ips) > 0
@pytest.mark.env_base
def test_get_node_ssh(self):
"""Try to get remote client for each node"""
LOG.info("Get remote for each master node")
for node in self.env.master_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS)
assert remote is not None
LOG.info("Get remote for each slave node")
for node in self.env.slave_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS)
assert remote is not None
LOG.info("Get remote for each k8s node")
for node in self.env.k8s_nodes:
remote = self.env.node_ssh_client(
node, **settings.SSH_NODE_CREDENTIALS
)
assert remote is not None
@pytest.mark.env_base
def test_kube_nodes_number_the_same(self):
"""Check number of slaves"""
LOG.info("Check number of nodes")
master = self.env.k8s_nodes[0]
remote = self.env.node_ssh_client(
master,
**settings.SSH_NODE_CREDENTIALS
)
cmd = "kubectl get nodes -o jsonpath={.items[*].metadata.name}"
result = remote.execute(command=cmd, verbose=True)
assert result["exit_code"] == 0, "Error: {0}".format(
"".join(result["stderr"])
)
k8s_nodes = result["stdout_str"].split()
devops_nodes = self.env.k8s_nodes
assert len(k8s_nodes) == len(devops_nodes)
@pytest.mark.env_base
def test_base_container_exists(self):
"""Check if all of base container exists"""
LOG.info("Checking docker container exists")
for node in self.env.k8s_nodes:
self.running_containers(node)

View File

@ -1,7 +1,7 @@
---
aliases:
dynamic_addresses_pool:
- &pool_default !os_env POOL_DEFAULT, 10.10.0.0/16:24
- &pool_default !os_env POOL_DEFAULT, 10.100.0.0/16:24
default_interface_model:
- &interface_model !os_env INTERFACE_MODEL, e1000
@ -26,7 +26,7 @@ template:
l2_network_device: +1
ip_ranges:
dhcp: [+2, -2]
ovs-pool01:
neutron-pool01:
net: *pool_default
groups:
@ -43,7 +43,7 @@ template:
network_pools:
public: public-pool01
private: private-pool01
ovs: ovs-pool01
neutron: neutron-pool01
l2_network_devices:
public:
@ -56,8 +56,8 @@ template:
address_pool: private-pool01
dhcp: true
ovs:
address_pool: ovs-pool01
neutron:
address_pool: neutron-pool01
dhcp: false
group_volumes:
@ -97,7 +97,7 @@ template:
l2_network_device: private
interface_model: *interface_model
- label: enp0s5
l2_network_device: ovs
l2_network_device: neutron
interface_model: *interface_model
network_config:
enp0s3: # Will get an IP from DHCP public-pool01
@ -108,7 +108,7 @@ template:
- private
enp0s5: # Won't get an IP, no DHCP in ovs-pool01
networks:
- ovs
- neutron
- name: node-2
role: k8s

View File

@ -0,0 +1,115 @@
---
aliases:
dynamic_addresses_pool:
- &pool_default !os_env POOL_DEFAULT, 10.100.0.0/16:24
default_interface_model:
- &interface_model !os_env INTERFACE_MODEL, e1000
template:
devops_settings:
env_name: !os_env ENV_NAME
address_pools:
public-pool01:
net: *pool_default
params:
vlan_start: 1210
ip_reserved:
gateway: +1
l2_network_device: +1
ip_ranges:
dhcp: [+128, -32]
rack-01: [+2, +127]
private-pool01:
net: *pool_default
params:
ip_reserved:
l2_network_device: +1
ip_ranges:
dhcp: [+128, -32]
neutron-pool01:
net: *pool_default
groups:
- name: default
driver:
name: devops.driver.libvirt
params:
connection_string: !os_env CONNECTION_STRING, qemu:///system
storage_pool_name: !os_env STORAGE_POOL_NAME, default
stp: False
hpet: False
use_host_cpu: !os_env DRIVER_USE_HOST_CPU, true
network_pools:
public: public-pool01
private: private-pool01
neutron: neutron-pool01
l2_network_devices:
public:
address_pool: public-pool01
dhcp: true
forward:
mode: nat
private:
address_pool: private-pool01
dhcp: true
neutron:
address_pool: neutron-pool01
dhcp: false
group_volumes:
- name: baseimage
source_image: !os_env IMAGE_PATH
format: qcow2
nodes:
- name: master
role: k8s
params: &rack-01-node-params
vcpu: !os_env SLAVE_NODE_CPU, 2
memory: !os_env SLAVE_NODE_MEMORY, 2048
boot:
- network
- hd
volumes:
- name: system
capacity: !os_env NODE_VOLUME_SIZE, 150
backing_store: baseimage
format: qcow2
- name: lvm
capacity: 10
format: qcow2
interfaces:
- label: iface0
l2_network_device: public
interface_model: *interface_model
- label: iface1
l2_network_device: private
interface_model: *interface_model
- label: iface2
l2_network_device: neutron
interface_model: *interface_model
network_config:
iface0:
networks:
- public
iface1:
networks:
- private
iface2:
networks:
- neutron
- name: slave-0
role: k8s
params: *rack-01-node-params
- name: slave-1
role: k8s
params: *rack-01-node-params

View File

@ -1,7 +1,7 @@
---
aliases:
dynamic_addresses_pool:
- &pool_default !os_env POOL_DEFAULT, 10.10.0.0/16:24
- &pool_default !os_env POOL_DEFAULT, 10.100.0.0/16:24
default_interface_model:
- &interface_model !os_env INTERFACE_MODEL, e1000
@ -23,9 +23,12 @@ template:
rack-01: [+2, +127]
private-pool01:
net: *pool_default
storage-pool01:
net: *pool_default
management-pool01:
params:
ip_reserved:
l2_network_device: +1
ip_ranges:
dhcp: [+128, -32]
neutron-pool01:
net: *pool_default
groups:
@ -42,8 +45,7 @@ template:
network_pools:
public: public-pool01
private: private-pool01
storage: storage-pool01
management: management-pool01
neutron: neutron-pool01
l2_network_devices:
public:
@ -52,16 +54,12 @@ template:
forward:
mode: nat
storage:
address_pool: storage-pool01
dhcp: false
management:
address_pool: management-pool01
dhcp: false
private:
address_pool: private-pool01
dhcp: true
neutron:
address_pool: neutron-pool01
dhcp: false
group_volumes:
@ -80,7 +78,7 @@ template:
- hd
volumes:
- name: system
capacity: !os_env NODE_VOLUME_SIZE, 50
capacity: !os_env NODE_VOLUME_SIZE, 150
backing_store: baseimage
format: qcow2
@ -92,10 +90,7 @@ template:
l2_network_device: private
interface_model: *interface_model
- label: iface2
l2_network_device: storage
interface_model: *interface_model
- label: iface3
l2_network_device: management
l2_network_device: neutron
interface_model: *interface_model
network_config:
iface0:
@ -106,10 +101,7 @@ template:
- private
iface2:
networks:
- storage
iface3:
networks:
- management
- neutron
- name: slave-0
role: k8s
@ -118,7 +110,3 @@ template:
- name: slave-1
role: k8s
params: *rack-01-node-params
#
# - name: slave-2
# role: k8s-node
# params: *rack-01-node-params

View File

@ -14,7 +14,7 @@ deps =
-r{toxinidir}/mcp_tests/requirements.txt
-r{toxinidir}/test-requirements.txt
usedevelop = False
commands = py.test -k unit_tests
commands = py.test -s -vvv mcp_tests/tests
[testenv:venv]
commands = {posargs}