Move ha_neutron and bvt to fuel_tests

Add ability to run_system_test run test from fuel_tests

move test groups into fuel_tests:
    bvt2 with new name is pytest_bvt2
    ha_heutron

Change-Id: Ic6a0d78e113f58af388ac2abe758a8be5e3e5867
This commit is contained in:
Dmitry Tyzhnenko 2016-04-24 22:42:36 +03:00 committed by Dmitry Tyzhnenko
parent ca510e241d
commit 6a35a38ab9
13 changed files with 803 additions and 2 deletions

34
doc/fuel_tests.rst Normal file
View File

@ -0,0 +1,34 @@
.. index:: Fuel tests
Fuel tests
**********
PyTest test config
==================
Conftest for Tests
------------------
.. automodule:: fuel_tests.tests.conftest.py
:members:
Models
======
Manager
----------
.. automodule:: fuel_tests.models.manager
:members:
Tests
=====
Ceph Tests
----------
.. automodule:: fuel_tests.tests.test_ceph.py
:members:
Neutron Tests
-------------
.. automodule:: fuel_tests.tests.test_neutron.py
:members:

View File

@ -11,3 +11,4 @@ Documentation for the QA test code repo
base_tests.rst
testrail.rst
system_tests.rst
fuel_tests.rst

0
fuel_tests/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,301 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelweb_test import logger
from fuelweb_test import settings
from fuelweb_test.helpers.decorators import create_diagnostic_snapshot
from fuelweb_test.helpers.utils import TimeStat
from fuelweb_test.tests.base_test_case import TestBasic as Basic
from system_test.core.discover import load_yaml
class Manager(Basic):
"""Manager class for tests."""
def __init__(self, config_file, cls):
super(Manager, self).__init__()
self.full_config = None
self.env_config = None
self.env_settings = None
self.config_name = None
self._devops_config = None
self._start_time = 0
self.config_file = config_file
if config_file:
self._load_config()
self._context = cls
def _cluster_from_template(self):
"""Create cluster from template file."""
slaves = int(self.full_config['template']['slaves'])
cluster_name = self.env_config['name']
snapshot_name = "ready_cluster_{}".format(cluster_name)
if self.check_run(snapshot_name):
self.env.revert_snapshot(snapshot_name)
cluster_id = self.fuel_web.client.get_cluster_id(cluster_name)
self._context._storage['cluster_id'] = cluster_id
logger.info("Got deployed cluster from snapshot")
return True
elif self.get_ready_slaves(slaves):
logger.info("Create env {}".format(
self.env_config['name']))
cluster_settings = {
"sahara": self.env_settings['components'].get(
'sahara', False),
"ceilometer": self.env_settings['components'].get(
'ceilometer', False),
"ironic": self.env_settings['components'].get(
'ironic', False),
"user": self.env_config.get("user", "admin"),
"password": self.env_config.get("password", "admin"),
"tenant": self.env_config.get("tenant", "admin"),
"volumes_lvm": self.env_settings['storages'].get(
"volume-lvm", False),
"volumes_ceph": self.env_settings['storages'].get(
"volume-ceph", False),
"images_ceph": self.env_settings['storages'].get(
"image-ceph", False),
"ephemeral_ceph": self.env_settings['storages'].get(
"ephemeral-ceph", False),
"objects_ceph": self.env_settings['storages'].get(
"rados-ceph", False),
"osd_pool_size": str(self.env_settings['storages'].get(
"replica-ceph", 2)),
"net_provider": self.env_config['network'].get(
'provider', 'neutron'),
"net_segment_type": self.env_config['network'].get(
'segment-type', 'vlan'),
"assign_to_all_nodes": self.env_config['network'].get(
'pubip-to-all',
False),
"neutron_l3_ha": self.env_config['network'].get(
'neutron-l3-ha', False),
"neutron_dvr": self.env_config['network'].get(
'neutron-dvr', False),
"neutron_l2_pop": self.env_config['network'].get(
'neutron-l2-pop', False)
}
cluster_id = self.fuel_web.create_cluster(
name=self.env_config['name'],
mode=settings.DEPLOYMENT_MODE,
release_name=self.env_config['release'],
settings=cluster_settings)
self._context._storage['cluster_id'] = cluster_id
logger.info("Add nodes to env {}".format(cluster_id))
names = "slave-{:02}"
num = iter(xrange(1, slaves + 1))
nodes = {}
for new in self.env_config['nodes']:
for _ in xrange(new['count']):
name = names.format(next(num))
while name in self.assigned_slaves:
name = names.format(next(num))
self.assigned_slaves.add(name)
nodes[name] = new['roles']
logger.info("Set roles {} to node {}".format(
new['roles'], name))
self.fuel_web.update_nodes(cluster_id, nodes)
self.fuel_web.verify_network(cluster_id)
self.fuel_web.deploy_cluster_wait(cluster_id)
self.fuel_web.verify_network(cluster_id)
self.env.make_snapshot(snapshot_name, is_make=True)
self.env.resume_environment()
return True
else:
logger.error("Can't deploy cluster because snapshot"
" with bootstrapped nodes didn't revert")
raise RuntimeError("Can't deploy cluster because snapshot"
" with bootstrapped nodes didn't revert")
def _cluster_from_config(self, config):
"""Create cluster from predefined config."""
slaves = len(config.get('nodes'))
cluster_name = config.get('name', self._context.__name__)
snapshot_name = "ready_cluster_{}".format(cluster_name)
if self.check_run(snapshot_name):
self.env.revert_snapshot(snapshot_name)
cluster_id = self.fuel_web.client.get_cluster_id(cluster_name)
self._context._storage['cluster_id'] = cluster_id
logger.info("Getted deployed cluster from snapshot")
return True
elif self.get_ready_slaves(slaves):
logger.info("Create env {}".format(cluster_name))
cluster_id = self.fuel_web.create_cluster(
name=cluster_name,
mode=config.get('mode', settings.DEPLOYMENT_MODE),
settings=config.get('settings', {})
)
self._context._storage['cluster_id'] = cluster_id
self.fuel_web.update_nodes(
cluster_id,
config.get('nodes')
)
self.fuel_web.verify_network(cluster_id)
self.fuel_web.deploy_cluster_wait(cluster_id)
self.fuel_web.verify_network(cluster_id)
self.env.make_snapshot(snapshot_name, is_make=True)
self.env.resume_environment()
return True
else:
logger.error("Can't deploy cluster because snapshot"
" with bootstrapped nodes didn't revert")
raise RuntimeError("Can't deploy cluster because snapshot"
" with bootstrapped nodes didn't revert")
def check_run(self, snapshot_name):
"""Checks if run of current test is required.
:param snapshot_name: Name of the snapshot the function should make
:type snapshot_name: str
"""
if snapshot_name:
return self.env.d_env.has_snapshot(snapshot_name)
def _load_config(self):
"""Read cluster config from yaml file."""
config = load_yaml(self.config_file)
self.full_config = config
self.env_config = config[
'template']['cluster_template']
self.env_settings = config[
'template']['cluster_template']['settings']
self.config_name = config['template']['name']
if 'devops_settings' in config['template']:
self._devops_config = config
def get_ready_setup(self):
"""Create virtual environment and install Fuel master node."""
logger.info("Getting ready setup")
if self.check_run("empty"):
self.env.revert_snapshot("empty")
return True
else:
with TimeStat("setup_environment", is_uniq=True):
self.env.setup_environment()
self.env.make_snapshot("empty", is_make=True)
self.env.resume_environment()
return True
def get_ready_release(self):
"""Make changes in release configuration."""
logger.info("Getting ready relase")
if self.check_run("ready"):
self.env.revert_snapshot("ready")
logger.info("Getted ready release from snapshot")
return True
elif self.get_ready_setup():
self.fuel_web.get_nailgun_version()
self.fuel_web.change_default_network_settings()
if (settings.REPLACE_DEFAULT_REPOS and
settings.REPLACE_DEFAULT_REPOS_ONLY_ONCE):
self.fuel_web.replace_default_repos()
self.env.make_snapshot("ready", is_make=True)
self.env.resume_environment()
return True
else:
logger.error("Can't config releases setup "
"snapshot didn't revert")
raise RuntimeError("Can't config releases setup "
"snapshot didn't revert")
def get_ready_slaves(self, slaves=None):
"""Bootstrap slave nodes."""
logger.info("Getting ready slaves")
slaves = slaves or int(self.full_config['template']['slaves'])
snapshot_name = "ready_with_{}_slaves".format(slaves)
if self.check_run(snapshot_name):
self.env.revert_snapshot(snapshot_name)
logger.info("Getted ready slaves from snapshot")
return True
elif self.get_ready_release():
logger.info("Bootstrap {} nodes".format(slaves))
self.env.bootstrap_nodes(self.env.d_env.nodes().slaves[:slaves],
skip_timesync=True)
self.env.make_snapshot(snapshot_name, is_make=True)
self.env.resume_environment()
return True
else:
logger.error("Can't bootstrap nodes because release "
"snapshot didn't revert")
return False
raise RuntimeError("Can't bootstrap nodes because release "
"snapshot didn't revert")
def get_ready_cluster(self, config=None):
"""Create and deploy cluster."""
logger.info("Getting deployed cluster")
config = config or self._context.cluster_config or None
if config:
self._cluster_from_config(config=config)
else:
self._cluster_from_template()
def show_step(self, step, details='', initialize=False):
"""Show a description of the step taken from docstring
:param int/str step: step number to show
:param str details: additional info for a step
"""
test_func = self._context._current_test
test_func_name = test_func.__name__
if initialize or step == 1:
self.current_log_step = step
else:
self.current_log_step += 1
if self.current_log_step != step:
error_message = 'The step {} should be {} at {}'
error_message = error_message.format(
step,
self.current_log_step,
test_func_name
)
logger.error(error_message)
docstring = test_func.__doc__
docstring = '\n'.join([s.strip() for s in docstring.split('\n')])
steps = {s.split('. ')[0]: s for s in
docstring.split('\n') if s and s[0].isdigit()}
if details:
details_msg = ': {0} '.format(details)
else:
details_msg = ''
if str(step) in steps:
logger.info("\n" + " " * 55 + "<<< {0} {1}>>>"
.format(steps[str(step)], details_msg))
else:
logger.info("\n" + " " * 55 + "<<< {0}. (no step description "
"in scenario) {1}>>>".format(str(step), details_msg))
def make_diagnostic_snapshot(self, status, name):
create_diagnostic_snapshot(self.env, status, name)
def save_env_snapshot(self, name):
self.env.make_snapshot(name, is_make=True)

View File

View File

@ -0,0 +1,134 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import pytest
from fuel_tests.models.manager import Manager
from fuelweb_test import logger
from fuelweb_test import settings
from system_test.core.discover import config_filter
@pytest.fixture(scope='session')
def config_file(request):
"""Fixture which provide config for test."""
template = settings.FUELQA_TEMPLATE
if template:
return config_filter([template])[template]
else:
return None
@pytest.fixture(scope='class', autouse=True)
def manager(request, config_file):
"""Fixture which link manager instante for each test class."""
manager = Manager(config_file, request.cls)
request.cls.manager = manager
request.cls._storage = dict()
request.cls._logger = logger
def get_env(self):
return self.manager.env
request.cls.env = property(get_env)
@pytest.fixture(scope='function', autouse=True)
def snapshot(request):
"""Fixture which provide getting of artifacs after test."""
get_logs = request.keywords.get('get_logs', None)
fail_snapshot = request.keywords.get('fail_snapshot', None)
def test_fin():
if request.node.rep_call.passed:
if get_logs:
request.instance.manager.make_diagnostic_snapshot(
status="test_pass",
name=request.node.function.__name__)
elif request.node.rep_setup.failed:
if get_logs:
request.instance.manager.make_diagnostic_snapshot(
status="prepare_failed",
name=request.node.function.__name__)
if fail_snapshot:
request.instance.manager.save_env_snapshot(
name="prep_fail_{}".format(request.node.function.__name__))
elif request.node.rep_call.failed:
if get_logs:
request.instance.manager.make_diagnostic_snapshot(
status="test_failed",
name=request.node.function.__name__)
if fail_snapshot:
request.instance.manager.save_env_snapshot(
name="fail_{}".format(request.node.function.__name__))
request.addfinalizer(test_fin)
@pytest.fixture(scope='function', autouse=True)
def prepare(request):
"""Fixture for prepearing environment for test.
Provided two marker behaviour:
need_ready_cluster marker if test need already deployed cluster
need_ready_slaves marker if test need already provisioned slaves
"""
need_ready_cluster = request.keywords.get('need_ready_cluster', None)
need_ready_slaves = request.keywords.get('need_ready_slaves', None)
if need_ready_cluster:
request.instance.manager.get_ready_cluster()
if need_ready_slaves:
request.instance.manager.get_ready_slaves()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Attache test result for each test object."""
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
setattr(item, "rep_" + rep.when, rep)
def pytest_runtest_setup(item):
"""Hook which run before test start."""
item.cls._current_test = item.function
item._start_time = time.time()
head = "<" * 5 + "#" * 30 + "[ {} ]" + "#" * 30 + ">" * 5
head = head.format(item.function.__name__)
steps = ''.join(item.function.__doc__)
start_step = "\n{head}\n{steps}".format(head=head, steps=steps)
logger.info(start_step)
def pytest_runtest_teardown(item):
"""Hook which run after test."""
step_name = item.function.__name__
spent_time = time.time() - item._start_time
minutes = spent_time // 60
seconds = int(round(spent_time)) % 60
finish_step = "FINISH {} STEP TOOK {} min {} sec".format(
step_name, minutes, seconds)
foot = "\n" + "<" * 5 + "#" * 30 + "[ {} ]" + "#" * 30 + ">" * 5
foot = foot.format(finish_step)
logger.info(foot)

View File

@ -0,0 +1,111 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
from fuelweb_test import logger
from fuelweb_test import settings
from fuelweb_test.helpers import checkers
from fuelweb_test.helpers.ssh_manager import SSHManager
ssh_manager = SSHManager()
class TestCephRadosGW(object):
"""Test class consits the tests for clustre with Ceph and RadosGW"""
# This cluster config used for all test in this class
cluster_config = {
'name': "TestCephRadosGW",
'mode': settings.DEPLOYMENT_MODE,
'settings': {
'volumes_lvm': False,
'volumes_ceph': True,
'images_ceph': True,
'objects_ceph': True,
'tenant': 'rados',
'user': 'rados',
'password': 'rados'
},
'nodes': {
'slave-01': ['controller'],
'slave-02': ['controller'],
'slave-03': ['controller'],
'slave-04': ['compute', 'ceph-osd'],
'slave-05': ['compute', 'ceph-osd'],
'slave-06': ['compute', 'ceph-osd']
}
}
@pytest.mark.get_logs
@pytest.mark.fail_snapshot
@pytest.mark.need_ready_cluster
@pytest.mark.pytest_bvt_2
def test_ceph_rados_gw(self):
"""Deploy ceph HA with RadosGW for objects
Scenario:
1. Create cluster with Neutron
2. Add 3 nodes with controller role
3. Add 3 nodes with compute and ceph-osd role
4. Deploy the cluster
5. Network check
6. Check HAProxy backends
7. Check ceph status
8. Run OSTF tests
9. Check the radosgw daemon is started
Duration 90m
"""
self.manager.show_step(1)
self.manager.show_step(2)
self.manager.show_step(3)
self.manager.show_step(4)
self.manager.show_step(5)
# HAProxy backend checking
self.manager.show_step(6)
fuel_web = self.manager.fuel_web
controller_nodes = fuel_web.get_nailgun_cluster_nodes_by_roles(
self._storage['cluster_id'], ['controller'])
for node in controller_nodes:
logger.info("Check all HAProxy backends on {}".format(
node['meta']['system']['fqdn']))
haproxy_status = checkers.check_haproxy_backend(node['ip'])
msg = "HAProxy backends are DOWN. {0}".format(haproxy_status)
assert haproxy_status['exit_code'] == 1, msg
self.manager.show_step(7)
fuel_web.check_ceph_status(self._storage['cluster_id'])
self.manager.show_step(8)
# Run ostf
fuel_web.run_ostf(cluster_id=self._storage['cluster_id'],
test_sets=['ha', 'smoke', 'sanity'])
self.manager.show_step(9)
# Check the radosgw daemon is started
for node in controller_nodes:
logger.info("Check radosgw daemon is started on {}".format(
node['meta']['system']['fqdn']))
cmd = "pkill -0 radosgw"
ip = node['ip']
err_msg = "radosgw daemon not started on {}".format(
node['meta']['system']['fqdn'])
ssh_manager.execute_on_remote(ip=ip, cmd=cmd, err_msg=err_msg)

View File

@ -0,0 +1,196 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
from fuelweb_test import logger
from fuelweb_test import settings
from fuelweb_test.helpers import checkers
from fuelweb_test.helpers import os_actions
from fuelweb_test.helpers.ssh_manager import SSHManager
ssh_manager = SSHManager()
@pytest.mark.get_logs
@pytest.mark.fail_snapshot
@pytest.mark.need_ready_cluster
@pytest.mark.ha_neutron
class TestNeutronTunHa(object):
"""NeutronTunHa.
Old groups: ha_neutron, neutron, ha, classic_provisioning
""" # TODO documentation
cluster_config = {
"name": "NeutronTunHa",
"mode": settings.DEPLOYMENT_MODE,
"settings": {
'net_provider': settings.NEUTRON,
'net_segment_type': settings.NEUTRON_SEGMENT['tun'],
'tenant': 'haTun',
'user': 'haTun',
'password': 'haTun'
},
"nodes": {
'slave-01': ['controller'],
'slave-02': ['controller'],
'slave-03': ['controller'],
'slave-04': ['compute'],
'slave-05': ['compute']
}
}
@pytest.mark.deploy_neutron_gre_ha
@pytest.mark.ha_neutron_gre
def test_deploy_neutron_gre_ha(self):
"""Deploy cluster in HA mode with Neutron TUN
Scenario:
1. Create cluster
2. Add 3 nodes with controller role
3. Add 2 nodes with compute role
4. Deploy the cluster
5. Run network verification
6. Run OSTF
Duration 80m
Snapshot deploy_neutron_gre_ha
"""
self.manager.show_step(1)
self.manager.show_step(2)
self.manager.show_step(3)
self.manager.show_step(4)
self.manager.show_step(5)
cluster_id = self._storage['cluster_id']
fuel_web = self.manager.fuel_web
cluster = fuel_web.client.get_cluster(cluster_id)
assert str(cluster['net_provider']) == settings.NEUTRON
devops_node = fuel_web.get_nailgun_primary_node(
self.env.d_env.nodes().slaves[0])
logger.debug("devops node name is {0}".format(devops_node.name))
_ip = fuel_web.get_nailgun_node_by_name(devops_node.name)['ip']
for _ in range(5):
try:
checkers.check_swift_ring(_ip)
break
except AssertionError:
cmd = "/usr/local/bin/swift-rings-rebalance.sh"
result = ssh_manager.execute(ip=_ip, cmd=cmd)
logger.debug("command execution result is {0}"
.format(result['exit_code']))
else:
checkers.check_swift_ring(_ip)
self.manager.show_step(6)
fuel_web.run_ostf(
cluster_id=cluster_id,
test_sets=['ha', 'smoke', 'sanity'])
@pytest.mark.get_logs
@pytest.mark.fail_snapshot
@pytest.mark.need_ready_cluster
@pytest.mark.ha_neutron
class TestNeutronVlanHa(object):
"""NeutronVlanHa.
Old groups: neutron, ha, ha_neutron
""" # TODO documentation
cluster_config = {
"name": "NeutronVlanHa",
"mode": settings.DEPLOYMENT_MODE,
"settings": {
"net_provider": settings.NEUTRON,
"net_segment_type": settings.NEUTRON_SEGMENT['vlan'],
'tenant': 'haVlan',
'user': 'haVlan',
'password': 'haVlan'
},
"nodes": {
'slave-01': ['controller'],
'slave-02': ['controller'],
'slave-03': ['controller'],
'slave-04': ['compute'],
'slave-05': ['compute']
}
}
@pytest.mark.deploy_neutron_vlan_ha
@pytest.mark.neutron_vlan_ha
def test_deploy_neutron_vlan_ha(self):
"""Deploy cluster in HA mode with Neutron VLAN
Scenario:
1. Create cluster
2. Add 3 nodes with controller role
3. Add 2 nodes with compute role
4. Deploy the cluster
5. Run network verification
6. Run OSTF
Duration 80m
Snapshot deploy_neutron_vlan_ha
"""
self.manager.show_step(1)
self.manager.show_step(2)
self.manager.show_step(3)
self.manager.show_step(4)
self.manager.show_step(5)
cluster_id = self._storage['cluster_id']
fuel_web = self.manager.fuel_web
cluster = fuel_web.client.get_cluster(cluster_id)
assert str(cluster['net_provider']) == settings.NEUTRON
os_conn = os_actions.OpenStackActions(
fuel_web.get_public_vip(cluster_id),
user=self.cluster_config['settings']['user'],
passwd=self.cluster_config['settings']['password'],
tenant=self.cluster_config['settings']['tenant'])
fuel_web.check_fixed_network_cidr(
cluster_id, os_conn)
fuel_web.verify_network(cluster_id)
devops_node = fuel_web.get_nailgun_primary_node(
self.env.d_env.nodes().slaves[0])
logger.debug("devops node name is {0}".format(devops_node.name))
_ip = fuel_web.get_nailgun_node_by_name(devops_node.name)['ip']
for _ in range(5):
try:
checkers.check_swift_ring(_ip)
break
except AssertionError:
cmd = "/usr/local/bin/swift-rings-rebalance.sh"
result = ssh_manager.execute(ip=_ip, cmd=cmd)
logger.debug("command execution result is {0}"
.format(result['exit_code']))
else:
checkers.check_swift_ring(_ip)
self.manager.show_step(6)
fuel_web.run_ostf(
cluster_id=cluster_id, test_sets=['ha', 'smoke', 'sanity'])

View File

@ -19,3 +19,4 @@ launchpadlib
beautifulsoup4>=4.2.0
requests>=2.2.0
joblib>=0.8.4
pytest>=2.9

View File

@ -27,6 +27,7 @@ def get_var_as_bool(name, default):
# Default timezone for clear logging
TIME_ZONE = 'UTC'
FUELQA_TEMPLATE = os.environ.get("FUELQA_TEMPLATE", None)
ENV_NAME = os.environ.get("ENV_NAME", "fuel_system_test")
VIRTUAL_ENV = os.environ.get("VIRTUAL_ENV", "")

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
addopts = -vvv -s -p no:django -p no:ipdb --junit-xml=nosetests.xml
testpaths = fuel_tests

View File

@ -3,6 +3,8 @@
import sys
import argparse
import pytest
from proboscis import TestProgram
from proboscis import register
@ -40,7 +42,7 @@ def print_explain(names):
print(pretty_log(out))
def clean_argv():
def clean_argv_proboscis():
"""Removing argv params unused by Proboscis"""
argv = sys.argv
if '--with-config' in argv:
@ -54,6 +56,21 @@ def clean_argv():
return argv
def group_in_pytest(group):
from _pytest.config import _prepareconfig
from _pytest.main import Session
from _pytest.python import FixtureManager
from _pytest.mark import MarkMapping
config = _prepareconfig(args="")
session = Session(config)
session._fixturemanager = FixtureManager(session)
l = [list(MarkMapping(i.keywords)._mymarks) for i
in session.perform_collect()]
groups = set([item for sublist in l for item in sublist])
return group in groups
def cli():
cli = argparse.ArgumentParser(prog="System test runner",
description="Command line tool for run Fuel "
@ -122,6 +139,8 @@ def run(**kwargs):
groups_to_run = []
groups.extend(old_groups or [])
for g in set(groups):
if group_in_pytest(g):
sys.exit(pytest.main('-m {}'.format(g)))
if config_name:
register_system_test_cases(
groups=[g],
@ -139,7 +158,7 @@ def run(**kwargs):
else:
register(groups=["run_system_test"], depends_on_groups=groups_to_run)
TestProgram(groups=['run_system_test'],
argv=clean_argv()).run_and_exit()
argv=clean_argv_proboscis()).run_and_exit()
def explain_group(**kwargs):