tempest-stress/tempest_stress/driver.py

268 lines
10 KiB
Python

# Copyright 2013 Quanta Research Cambridge, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import signal
import time
from oslo_log import log as logging
from oslo_utils import importutils
import six
from tempest import clients
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest import exceptions
from tempest.lib.common import cred_client
from tempest.lib.common import ssh
from tempest.lib.common.utils import data_utils
from tempest_stress import cleanup
from tempest_stress import config as stress_cfg
CONF = config.CONF
STRESS_CONF = stress_cfg.CONF
LOG = logging.getLogger(__name__)
processes = []
def do_ssh(command, host, ssh_user, ssh_key=None):
ssh_client = ssh.Client(host, ssh_user, key_filename=ssh_key)
try:
return ssh_client.exec_command(command)
except exceptions.SSHExecCommandFailed:
LOG.error('do_ssh raise exception. command:%s, host:%s.'
% (command, host))
return None
def _get_compute_nodes(controller, ssh_user, ssh_key=None):
"""Returns a list of active compute nodes.
List is generated by running nova-manage on the controller.
"""
nodes = []
cmd = "nova-manage service list | grep ^nova-compute"
output = do_ssh(cmd, controller, ssh_user, ssh_key)
if not output:
return nodes
# For example: nova-compute xg11eth0 nova enabled :-) 2011-10-31 18:57:46
# This is fragile but there is, at present, no other way to get this info.
for line in output.split('\n'):
words = line.split()
if len(words) > 0 and words[4] == ":-)":
nodes.append(words[1])
return nodes
def _has_error_in_logs(logfiles, nodes, ssh_user, ssh_key=None,
stop_on_error=False):
"""Detect errors in nova log files on the controller and compute nodes."""
grep = 'egrep "ERROR|TRACE" %s' % logfiles
ret = False
for node in nodes:
errors = do_ssh(grep, node, ssh_user, ssh_key)
if len(errors) > 0:
LOG.error('%s: %s' % (node, errors))
ret = True
if stop_on_error:
break
return ret
def sigchld_handler(signalnum, frame):
"""Signal handler (only active if stop_on_error is True)."""
for process in processes:
if (not process['process'].is_alive() and
process['process'].exitcode != 0):
signal.signal(signalnum, signal.SIG_DFL)
terminate_all_processes()
break
def terminate_all_processes(check_interval=20):
"""Goes through the process list and terminates all child processes."""
LOG.info("Stopping all processes.")
for process in processes:
if process['process'].is_alive():
try:
process['process'].terminate()
except Exception:
pass
time.sleep(check_interval)
for process in processes:
if process['process'].is_alive():
try:
pid = process['process'].pid
LOG.warning("Process %d hangs. Send SIGKILL." % pid)
os.kill(pid, signal.SIGKILL)
except Exception:
pass
process['process'].join()
def stress_openstack(tests, duration, max_runs=None, stop_on_error=False):
"""Workload driver. Executes an action function against a nova-cluster."""
admin_manager = credentials.AdminManager()
ssh_user = STRESS_CONF.stress.target_ssh_user
ssh_key = STRESS_CONF.stress.target_private_key_path
logfiles = STRESS_CONF.stress.target_logfiles
log_check_interval = int(STRESS_CONF.stress.log_check_interval)
default_thread_num = int(
STRESS_CONF.stress.default_thread_number_per_action)
if logfiles:
controller = STRESS_CONF.stress.target_controller
computes = _get_compute_nodes(controller, ssh_user, ssh_key)
for node in computes:
do_ssh("rm -f %s" % logfiles, node, ssh_user, ssh_key)
skip = False
for test in tests:
for service in test.get('required_services', []):
if not CONF.service_available.get(service):
skip = True
break
if skip:
break
# TODO(andreaf) This has to be reworked to use the credential
# provider interface. For now only tests marked as 'use_admin' will
# work.
if test.get('use_admin', False):
manager = admin_manager
else:
raise NotImplementedError('Non admin tests are not supported')
for p_number in range(test.get('threads', default_thread_num)):
if test.get('use_isolated_tenants', False):
username = data_utils.rand_name("stress_user")
tenant_name = data_utils.rand_name("stress_tenant")
password = "pass"
if CONF.identity.auth_version == 'v2':
identity_client = admin_manager.identity_client
projects_client = admin_manager.tenants_client
roles_client = admin_manager.roles_client
users_client = admin_manager.users_client
domains_client = None
else:
identity_client = admin_manager.identity_v3_client
projects_client = admin_manager.projects_client
roles_client = admin_manager.roles_v3_client
users_client = admin_manager.users_v3_client
domains_client = admin_manager.domains_client
domain = (identity_client.auth_provider.credentials.
get('project_domain_name', 'Default'))
credentials_client = cred_client.get_creds_client(
identity_client, projects_client, users_client,
roles_client, domains_client, project_domain_name=domain)
project = credentials_client.create_project(
name=tenant_name, description=tenant_name)
user = credentials_client.create_user(username, password,
project, "email")
# Add roles specified in config file
for conf_role in CONF.auth.tempest_roles:
credentials_client.assign_user_role(user, project,
conf_role)
creds = credentials_client.get_credentials(user, project,
password)
manager = clients.Manager(credentials=creds)
test_obj = importutils.import_class(test['action'])
test_run = test_obj(manager, max_runs, stop_on_error)
kwargs = test.get('kwargs', {})
test_run.setUp(**dict(six.iteritems(kwargs)))
LOG.debug("calling Target Object %s" %
test_run.__class__.__name__)
mp_manager = multiprocessing.Manager()
shared_statistic = mp_manager.dict()
shared_statistic['runs'] = 0
shared_statistic['fails'] = 0
p = multiprocessing.Process(target=test_run.execute,
args=(shared_statistic,))
process = {'process': p,
'p_number': p_number,
'action': test_run.action,
'statistic': shared_statistic}
processes.append(process)
p.start()
if stop_on_error:
# NOTE(mkoderer): only the parent should register the handler
signal.signal(signal.SIGCHLD, sigchld_handler)
end_time = time.time() + duration
had_errors = False
try:
while True:
if max_runs is None:
remaining = end_time - time.time()
if remaining <= 0:
break
else:
remaining = log_check_interval
all_proc_term = True
for process in processes:
if process['process'].is_alive():
all_proc_term = False
break
if all_proc_term:
break
time.sleep(min(remaining, log_check_interval))
if stop_on_error:
if any([True for proc in processes
if proc['statistic']['fails'] > 0]):
break
if not logfiles:
continue
if _has_error_in_logs(logfiles, computes, ssh_user, ssh_key,
stop_on_error):
had_errors = True
break
except KeyboardInterrupt:
LOG.warning("Interrupted, going to print statistics and exit ...")
if stop_on_error:
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
terminate_all_processes()
sum_fails = 0
sum_runs = 0
LOG.info("Statistics (per process):")
for process in processes:
if process['statistic']['fails'] > 0:
had_errors = True
sum_runs += process['statistic']['runs']
sum_fails += process['statistic']['fails']
print("Process %d (%s): Run %d actions (%d failed)" % (
process['p_number'],
process['action'],
process['statistic']['runs'],
process['statistic']['fails']))
print("Summary:")
print("Run %d actions (%d failed)" % (sum_runs, sum_fails))
if not had_errors and STRESS_CONF.stress.full_clean_stack:
LOG.info("cleaning up")
cleanup.cleanup()
if had_errors:
return 1
else:
return 0