Remove ansible module import

Change-Id: I318311d6697f099ee4cb013e3d5a2ed60cc23ca8
This commit is contained in:
Vinod Pandarinathan 2017-02-16 16:00:14 -08:00
parent 08772ac0da
commit 35b5cbd0dd
32 changed files with 154 additions and 2953 deletions

6
.gitignore vendored
View File

@ -51,3 +51,9 @@ ChangeLog
*~
.*.swp
.*sw?
.idea/cloudpulse.iml
.idea/misc.xml
.idea/modules.xml
.idea/vcs.xml
.idea/workspace.xml
.idea/inspectionProfiles/profiles_settings.xml

View File

@ -1,313 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ansible.constants as CONST
import ansible.inventory
import ansible.runner
import cloudpulse
from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
import json
import os
def get_temp_path():
base_dir = os.path.dirname(cloudpulse.__file__)
try:
config_file = base_dir + '/scenario/plugins/security_pulse/config/' +\
'securityhealth_test_input.yaml'
input_reader = security_test_input_reader(config_file)
input_data = input_reader.process_security_input_file()
return input_data['global_data']['file_info_dir']
except Exception:
print ("Exception while getting temp path..")
return "/var/sec_hc/"
CONST.HOST_KEY_CHECKING = False
TMP_LOCATION = get_temp_path()
is_containerized = False
class ansible_runner(object):
def __init__(self, os_node_list=[]):
self.openstack_node = os_node_list
self.remote_user = None
self.inventory = None
def execute_cmd(self, command, file_list=[], ips=[], roles=[],
container_name=None):
inventory = None
filetered_os_list = []
if ips:
filetered_os_list = self.get_os_node_list(ip_list=ips)
elif roles:
filetered_os_list = self.get_os_node_list(role_list=roles)
else:
filetered_os_list = self.openstack_node
# print filetered_os_list
if filetered_os_list:
inventory = self.init_ansible_inventory(filetered_os_list)
if inventory:
self.inventory = inventory
if is_containerized:
self.execute("mkdir " + TMP_LOCATION,
container_name=container_name)
for f in file_list:
self.copy(f, TMP_LOCATION, container_name=container_name)
out = self.execute(command, container_name=container_name)
print (out)
# remove the files from containers
self.execute("rm -rf " + TMP_LOCATION,
container_name=container_name)
if is_containerized:
# remove the files from host
self.execute("rm -rf " + TMP_LOCATION)
return out
def set_ansible_inventory(self, inv):
self.inventory = inv
def set_credential(self, user):
self.remote_user = user
def init_ansible_inventory(self, os_node_list):
ip_list = []
for os_node in os_node_list:
ip_list.append(os_node.getIp())
self.remote_user = os_node.getUser()
inventory = ansible.inventory.Inventory(ip_list)
return inventory
def get_os_node_list(self, ip_list=[], role_list=[]):
filetered_list = []
if not ip_list and not role_list:
return self.openstack_node
if ip_list and self.openstack_node:
for ip in ip_list:
for os_node in self.openstack_node:
if ip == os_node.getIp():
filetered_list.append(os_node)
elif role_list and self.openstack_node:
for role in role_list:
for os_node in self.openstack_node:
if role == os_node.getRole():
filetered_list.append(os_node)
return filetered_list
def copy(self, src, dest, container_name=None):
runner = ansible.runner.Runner(
module_name='copy',
module_args='src=%s dest=%s' % (src, dest),
remote_user=self.remote_user,
inventory=self.inventory,
forks=1,
)
out = runner.run()
print (out)
# copy to container
if is_containerized:
con_runner = self.container_copy(src, dest, container_name)
out1 = con_runner.run()
print (out1)
return out
def container_copy(self, src, dest, container_name):
new_src = TMP_LOCATION + src.split('/')[-1]
dest = dest + src.split('/')[-1]
cmd = "docker exec -i %s sh -c 'cat > %s' < %s" \
% (container_name, dest, new_src)
runner = ansible.runner.Runner(
module_name='shell',
module_args=cmd,
remote_user=self.remote_user,
# remote_pass=self.remote_pass,
inventory=self.inventory,
forks=1,
)
print (cmd)
return runner
def fetch(self, src, dest, flat='yes'):
runner = ansible.runner.Runner(
module_name='fetch',
module_args='src=%s dest=%s flat=%s' % (src, dest, flat),
remote_user=self.remote_user,
inventory=self.inventory,
forks=1,
)
out = runner.run()
return out
# can perform all shell operations Ex: rm /tmp/output
def execute(self, command, container_name=None, roles=[]):
filetered_os_list = []
if roles:
filetered_os_list = self.get_os_node_list(role_list=roles)
self.inventory = self.init_ansible_inventory(filetered_os_list)
if is_containerized and container_name:
command = 'docker exec %s %s' % (container_name, command)
# print command
runner = ansible.runner.Runner(
module_name='shell',
module_args=command,
remote_user=self.remote_user,
inventory=self.inventory,
forks=1,
)
out = runner.run()
return out
def ping(self, container_name=None, roles=[]):
filetered_os_list = []
if roles:
filetered_os_list = self.get_os_node_list(role_list=roles)
self.inventory = self.init_ansible_inventory(filetered_os_list)
runner = ansible.runner.Runner(
module_name='ping',
remote_user=self.remote_user,
inventory=self.inventory,
timeout=30,
forks=1,
)
out = runner.run()
return out
def get_results(self):
result = {}
if not os.path.isdir(TMP_LOCATION + 'output/'):
return result
files = os.walk(TMP_LOCATION + 'output/').next()[1]
for f in files:
try:
result[f] = open(TMP_LOCATION + 'output/' +
f + TMP_LOCATION + 'output', 'r').read()
except IOError:
print ("Error opening the file : " + TMP_LOCATION +
'output/' + f + TMP_LOCATION + 'output')
return result
def validate_results(self, results, checks=None):
results['status'] = 'PASS'
failed_hosts = []
if results['dark']:
failed_hosts.append(results['dark'].keys())
results['status'] = 'FAIL'
results['status_message'] = ''
for node in results['contacted'].keys():
if 'failed' in results['contacted'][node]:
if results['contacted'][node]['failed'] is True:
results['status'] = 'FAIL'
results['status_message'] = " ".join(
[("%s -> %s") % (key, results['dark'][key])
for key in results['dark']])
for node in results['contacted'].keys():
rc = results['contacted'][node].get('rc', None)
if rc is not None and rc != 0:
failed_hosts.append(node)
results['status'] = 'FAIL'
results['status_message'] = results[
'contacted'][node].get('stderr', None)
if checks is None:
# print "No additional checks validated"
return results, failed_hosts
for check in checks:
key = check.keys()[0]
value = check.values()[0]
for node in results['contacted'].keys():
if key in results['contacted'][node].keys():
if results['contacted'][node][key] != value:
failed_hosts.append(node)
results['status'] = 'FAIL'
results['status_message'] = ''
return (results, failed_hosts)
def get_parsed_ansible_output(self, output_data):
if output_data:
return self.get_validated_data(output_data)
else:
msg = {
'message': 'No result from test execution',
'status': 'Fail'}
return (404, json.dumps([msg], []))
def get_validated_data(self, results):
print ("Inside get_validated_data", results)
# final_result = {}
output = []
status = 200 # 'PASS'
###################################################
# First validation is to make sure connectivity to
# all the hosts was ok.
###################################################
if results['dark']:
status = 404 # 'FAIL'
##################################################
# Now look for status 'failed'
##################################################
for node in results['contacted'].keys():
if 'failed' in results['contacted'][node]:
if results['contacted'][node]['failed'] is True:
status = 404 # 'FAIL'
msg = {
'node': node,
'status': 'Fail',
'message': 'Execution failed'}
output.append(msg)
#################################################
# Check for the return code 'rc' for each host.
#################################################
for node in results['contacted'].keys():
rc = results['contacted'][node].get('rc', None)
if rc is not None and rc != 0:
status = 404 # 'FAIL'
node_info = results['contacted'][node]
op = eval(node_info.get('stdout'))
if not op.get('OverallStatus'):
status = 404 # 'FAIL'
try:
res = op.get('result', [])
for tc in res:
tc.update({'node': node})
output.append(tc)
except Exception:
print ("Exception while getting the result" +
" from the ansible output")
return (status, json.dumps(output), [])
"""
if __name__ == '__main__':
os_node_info_obj = openstack_node_info_reader("/home/ubuntu/
sasi/cpulse/cloudpulse/plugins/security_pulse/config/
openstack_config.yaml")
openstack_node_list = os_node_info_obj.get_host_list()
print openstack_node_list
flist=["/home/ubuntu/sasi/cpulse/cloudpulse/plugins/
security_pulse/testcase/TLS_Enablement_Check.py"]
ans_runner = ansible_runner(openstack_node_list)
ans_runner.execute_cmd("python "+TMP_LOCATION+
"TLS_Enablement_Check.py",file_list=flist)
"""

View File

@ -12,17 +12,19 @@
from __future__ import print_function
from cloudpulse.openstack.api.nova_api import NovaHealth
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario import base
import json
import errno
import os
from oslo_config import cfg
from oslo_utils import importutils
import re
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
import shlex
import simplejson
from subprocess import PIPE
from subprocess import Popen
TESTS_OPTS = [
cfg.StrOpt('operator_setup_file',
@ -32,13 +34,13 @@ TESTS_OPTS = [
default=True,
help='enable if the processes are running as containers'),
cfg.StrOpt('rabbit_container',
default='rabbitmq_v1',
default='rabbitmq',
help='name of the rabitmq container'),
cfg.StrOpt('galera_container',
default='mariadb_v1',
default='mariadb',
help='name of the galera cluster container'),
cfg.StrOpt('ceph_container',
default='ceph_v1',
default='ceph',
help='name of the ceph cluster container'),
]
@ -74,9 +76,45 @@ periodic_test_group = cfg.OptGroup(name='periodic_tests',
title='Periodic tests to be run')
CONF.register_opts(PERIODIC_TESTS_OPTS, periodic_test_group)
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
def execute(command):
try:
command = shlex.split(command)
stdout = None
stderr = None
p = Popen(command, shell=False, stdout=PIPE,
stderr=PIPE,
bufsize=-1, env=os.environ, close_fds=True)
stdout, stderr = p.communicate()
except OSError as e:
if e.errno == errno.ENOENT:
return {'status': 127, 'output': ""}
else:
return {'status': 126, 'output': ""}
if p.returncode == 126 or p.returncode == 127:
stdout = str(b"")
return {'status': p.returncode, 'output': stdout}
def get_container_name(name):
cmd = "ansible -o all -i 127.0.0.1, -a 'docker ps' -u root"
op = execute(cmd)
if op['status']:
return None
dockerps = op['output'].split('\\n')
for line in dockerps:
if name in line:
linear = line.split()
return linear[len(linear) - 1].strip('\n')
return None
class operator_scenario(base.Scenario):
def _get_nova_hypervior_list(self):
importutils.import_module('keystonemiddleware.auth_token')
creds = {}
@ -92,28 +130,26 @@ class operator_scenario(base.Scenario):
def load(self):
self.os_node_info_obj = openstack_node_info_reader(
cfg.CONF.operator_test.operator_setup_file)
openstack_node_list = self.os_node_info_obj.get_host_list()
self.ans_runner = ansible_runner(openstack_node_list)
inventory = self.ans_runner.init_ansible_inventory(openstack_node_list)
self.ans_runner.set_ansible_inventory(inventory)
@base.scenario(admin_only=False, operator=True)
def rabbitmq_check(self):
self.load()
anscmd = "ansible -o all -i 127.0.0.1, -a "
cmd = "rabbitmqctl cluster_status -q"
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
rabbit_container = cfg.CONF.operator_test.rabbit_container
cmd = ("docker exec %s %s" % (rabbit_container, cmd))
rabbit_container = get_container_name('rabbitmq')
cmd = ("'docker exec %s %s'" % (rabbit_container, cmd))
out = self.ans_runner.execute(cmd, roles=['controller'])
res, output = self.ans_runner.validate_results(out)
cmd = anscmd + cmd + " -u root "
if res['status'] is 'PASS':
node_status = res['contacted'][
res['contacted'].keys()[0]]['stdout']
node_status_string = node_status.replace('\n', '')
res = execute(cmd)
if not res['status']:
node_status = res['output']
node_status_string = node_status.replace('\\n', '')
node_status_string = node_status_string.replace(' ', '')
nodes = []
running = []
@ -131,112 +167,144 @@ class operator_scenario(base.Scenario):
for x in mathobj.group(1).split(",")]
diffnodes = list(set(nodes) - set(running))
if diffnodes:
return(404, ("Failed Nodes : %s" %
str(diffnodes)))
return (404, ("Failed Nodes : %s" %
str(diffnodes)))
else:
return (200, "Running Nodes : %s" % str(nodes),
['RabbitMQ-server Running'])
else:
return (404, ("RabbitMQ-server test failed :%s" %
res['status_message']), [])
"rabbitmq-service is down", []))
@base.scenario(admin_only=False, operator=True)
def galera_check(self):
self.load()
anscmd = "ansible -o all -i 127.0.0.1, -a "
galera = self.os_node_info_obj.get_galera_details()
cmd = ((r"mysql -u %s -p%s -e 'SHOW STATUS;'|grep "
"wsrep_incoming_addresses") %
cmd = ((r'mysql -u %s -p%s -e "SHOW STATUS;"') %
(galera['username'], galera['password']))
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
galera_container = cfg.CONF.operator_test.galera_container
cmd = ("docker exec %s %s" % (galera_container, cmd))
galera_container = get_container_name('mariadb')
out = self.ans_runner.execute(cmd, roles=['controller'])
results, failed_hosts = self.ans_runner.validate_results(out)
cmd = ("'docker exec %s %s'" % (galera_container, cmd))
cmd = anscmd + cmd + ' -u root'
if results['status'] is 'PASS':
galera_status = results['contacted'][
results['contacted'].keys()[0]]['stdout']
galera_status_string = galera_status.replace('\n', '')
mathobj = re.search(r'wsrep_incoming_addresses\s+(.*?)$',
res = execute(cmd)
if not res['status']:
galera_status = res['output']
if 'wsrep_incoming_addresses' not in galera_status:
return (404, ("Galera Cluster Test Failed: %s" %
"Invalid cluster status", []))
galera_status_string = galera_status.replace('\\n', '')
mathobj = re.search(r'wsrep_incoming_addresses\s+(.*?)wsrep.*$',
galera_status_string, re.M | re.I)
nodes = mathobj.group(1)
return (200, "Active Nodes : %s" % nodes,
['Galera Cluster Test Passed'])
else:
return (404, ("Galera Cluster Test Failed: %s" %
results['status_message']), [])
"service access failed", []))
@base.scenario(admin_only=False, operator=True)
def docker_check(self):
self.load()
cmd = "docker ps -aq --filter 'status=exited'"
out = self.ans_runner.execute(cmd)
node_list = self.os_node_info_obj.get_host_list()
results, failed_hosts = self.ans_runner.validate_results(out)
if results['status'] is 'PASS':
docker_failed = {key: results['contacted'][key]['stdout']
for key in results['contacted']
if results['contacted'][key]['stdout']}
nodeip_list = [node.ip for node in node_list]
anscmd = "ansible -o all -i %s -a " % ','.join(nodeip_list)
cmd = "'docker ps -aq --filter %s '" % "status=exited"
cmd = anscmd + cmd + ' -u root'
res = execute(cmd)
docker_failed = None
if not res['status']:
res['output'] = res['output'].split('\n')
output = filter(lambda x: not re.match(r'^\s*$', x), res['output'])
for line in output:
line = line.split('|')
if len(line) < 3:
continue
if 'SUCCESS' not in line[1]:
if docker_failed:
docker_failed = docker_failed + ',' + line[0]
else:
docker_failed = line[0]
else:
line[3] = line[3].replace(' ', '')
line[3] = line[3].replace('(stdout)', '')
if not re.match(r'^\s*$', line[3]):
if docker_failed:
docker_failed = docker_failed + ',' + line[0]
else:
docker_failed = line[0]
if docker_failed:
docker_str = " ".join(["Containers failed in %s : %s" % (
key, docker_failed[key]) for key in docker_failed])
return (404, docker_str, [])
return (404, docker_failed, [])
else:
return (200, "All docker containers are up",
['Docker container Test Passed'])
else:
return (404, ("Docker Check Failed: %s" %
results['status_message']), [])
"docker daemon not accessible", []))
@base.scenario(admin_only=False, operator=True)
def ceph_check(self):
self.load()
storage_nodes_from_ansible_config = [node.name.lower(
) for node in self.os_node_info_obj.get_host_list()
if node.role == "block_storage"]
storage_nodes_from_ansible_config = [node.name.lower()
for node in
self.os_node_info_obj
.get_host_list()
if node.role == "block_storage"]
if storage_nodes_from_ansible_config:
cmd = (r"ceph -f json status")
is_containerized = cfg.CONF.operator_test.containerized
if is_containerized:
ceph_container = cfg.CONF.operator_test.ceph_container
cmd = ("docker exec %s %s" % (ceph_container, cmd))
ceph_container = get_container_name("ceph")
cmd = ("'docker exec %s %s'" % (ceph_container, cmd))
anscmd = "ansible -o all -i 127.0.0.1, -a "
cmd = anscmd + cmd + ' -u root'
out = self.ans_runner.execute(cmd, roles=['controller'])
results, failed_hosts = self.ans_runner.validate_results(out)
res = execute(cmd)
if not res['status']:
ceph_status = res['output']
if results['status'] is 'PASS':
ceph_status = results['contacted'][
results['contacted'].keys()[0]]['stdout']
ceph_status_string = ceph_status.replace('\n', '')
ceph_json = json.loads(ceph_status_string)
ceph_status = ceph_status.replace('\n', '')
ceph_data = ceph_status.split('|')
ceph_str = ceph_data[3].replace(' (stdout) ', '') \
.replace('\\n', '')
ceph_json = simplejson.loads(ceph_str)
overall_status = ceph_json['health']['overall_status']
num_of_osd = ceph_json['osdmap']['osdmap']['num_osds']
num_up_osds = ceph_json['osdmap']['osdmap']['num_up_osds']
if overall_status == 'HEALTH_OK':
return (200, "Overall Status = %s, "
"Cluster status = %s/%s" %
"Cluster status = %s/%s" %
(overall_status, num_up_osds, num_of_osd))
else:
return (404, "Overall Status = %s, "
"Cluster status = %s/%s" %
"Cluster status = %s/%s" %
(overall_status, num_up_osds, num_of_osd))
else:
return (300, ("Ceph cluster test skipped "
"as no dedicated storage found"))
else:
return (300, ("Ceph cluster test skipped "
"as no dedicated storage found"))
@base.scenario(admin_only=False, operator=True)
def node_check(self):
failed_hosts = None
self.load()
nodes_from_ansible_config = [node.name.lower(
) for node in self.os_node_info_obj.get_host_list()
if node.role == "compute"]
nodes_from_ansible_config = [node.name.lower()
for node in
self.os_node_info_obj.get_host_list()
if node.role == "compute"]
nova_hypervisor_list = self._get_nova_hypervior_list()
if nova_hypervisor_list[0] != 200:
return (404, ("Cannot get hypervisor list from "
@ -254,11 +322,19 @@ class operator_scenario(base.Scenario):
return (404, ("Hypervisors in nova hypervisor list are less"
" than configured.nova hypervisor list = %s") %
nodes_from_nova)
out = self.ans_runner.ping()
results, failed_hosts = self.ans_runner.validate_results(out)
if results['status'] is 'PASS':
anscmd = ("ansible -o all -i '%s' -m ping -u root" %
','.join(nova_hypervisor_list[2]))
res = execute(anscmd)
res['output'] = res['output'].split('\n')
output = filter(lambda x: not re.match(r'^\s*$', x), res['output'])
for line in output:
if "SUCCESS" not in line:
failed_hosts = failed_hosts + line.split('|')[0]
if not res['status']:
return (200, "All nodes are up.nova hypervisor list = %s" %
nodes_from_nova)
nodes_from_nova)
else:
msg = "Some nodes are not up"
if failed_hosts:

View File

@ -1,24 +0,0 @@
# control-1:
# ip: 172.31.231.14
# user: root
# password: cisco123
# role: controller
# dirlist: [/etc/keystone,/etc/nova,/etc/neutron]
# compute-1:
# ip: 172.31.231.15
# user: root
# password: cisco123
# role: compute
# dirlist: [/etc/nova,/etc/neutron]
control-1:
ip: 172.29.74.98
user: ubuntu
password: CTO1234!
role: controller
dirlist: [/tmp/keystone,/tmp/nova,/tmp/neutron]
control-2:
ip: 172.31.231.59
user: root
password: cisco123
role: controller
dirlist: [/etc/my.cnf,/etc/my.cnf.d/,/var/lib/mysql/,/var/log/mariadb/mariadb.log,/var/run/mariadb/mariadb.pid]

View File

@ -1,5 +0,0 @@
control-1:
ip: 172.22.191.136
user: root
password: cisco123
role: controller

View File

@ -1,61 +0,0 @@
securityhealth:
global_data:
file_info_dir: /tmp/sec_hc/
common:
perform_on: [controller,compute]
testcase: [tls_enablement_check]
password_encryption_check:
perform_on: [controller]
input:
conf_file: [/etc/keystone/keystone.conf]
filepermission:
perform_on: [controller]
input:
baseline_file: /tmp/sec_hc/os_allnode_baseline
controller_dir: [/etc/keystone,/etc/nova,/etc/neutron]
compute_dir: [/etc/nova,/etc/neutron]
logfile_mode_check:
perform_on: [controller,compute]
input:
conf_file_dir: [/etc/keystone/,/etc/nova/,/etc/neutron/,/etc/glance/]
logrotate_cfg_check:
perform_on: [controller,compute]
input:
ks_admin_token_check:
perform_on: [controller]
input:
tls_enablement_check:
perform_on: [controller]
input:
keystone:
perform_on: [controller]
testcase: [token_mangement.token_deletion,service.service_restart]
token_mangement:
token_deletion:
input:
token_expiration:
input:
token_time: 10
configuration:
configuration_check:
input:
algorithm: md5
service:
service_restart:
input:
horizon:
perform_on: [controller]
testcase:
configuration:
configuration_check:
input:
conffile: [https.conf]
ServerTokens: Prod
ServerSignature: off
TraceEnable: off
mysql:
perform_on: [controller]
testcase: [mysql_tls_enablement_test,mysql_filecheck_test]
mysql_tls_enablement_test:
perform_on: [controller]
input:

View File

@ -1,181 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from cloudpulse import objects
from cloudpulse.scenario import base
from cloudpulse.scenario.plugins.security_pulse.testcase.file_check_test\
import SecurityFileCheck
from cloudpulse.scenario.plugins.security_pulse.testcase.\
ks_admin_token_check import ks_admin_token_check
from cloudpulse.scenario.plugins.security_pulse.testcase.log_rotate_test \
import log_file_rotate_test
from cloudpulse.scenario.plugins.security_pulse.testcase.logfile_mode_test\
import log_file_mode_check_test
"""
from cloudpulse.scenario.plugins.security_pulse.testcase.mysql_db_test\
import mysql_db_test
"""
from cloudpulse.scenario.plugins.security_pulse.testcase.mysql_tls_enable_test\
import mysql_tls_enablement_test
from cloudpulse.scenario.plugins.security_pulse.testcase.\
password_encryption_test import password_encryption_check
from cloudpulse.scenario.plugins.security_pulse.testcase.tls_enable_test \
import tls_enablement_test
from cloudpulse.scenario.plugins.security_pulse.util import \
security_pulse_test_util as utils
import json
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file'),
cfg.StrOpt('testcase_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='setup file for security pulse test case'),
]
CONF = cfg.CONF
security_pulse_test_group = cfg.OptGroup(name='security_pulse_test',
title='Security pulse test' +
' param input file')
CONF.register_group(security_pulse_test_group)
CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
class security_pulse_scenario(base.Scenario):
@base.scenario(admin_only=False, operator=False)
def password_encryption_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "password_encryption_check")
pwd_test = password_encryption_check()
result = pwd_test.perform_password_encryption_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def keystone_tls_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "tls_enablement_check")
test = tls_enablement_test()
result = test.perform_tls_enablement_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def keystone_admin_token_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "ks_admin_token_check")
test = ks_admin_token_check()
result = test.perform_ks_admin_token_check_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def file_comparision_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "filepermission")
test = SecurityFileCheck()
result = test.perform_file_permission_check(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def logfile_mode_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "logfile_mode_check")
test = log_file_mode_check_test()
result = test.perform_log_file_mode_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def logfile_rotate_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "logrotate_cfg_check")
test = log_file_rotate_test()
result = test.perform_log_file_rotate_test(input_params)
return result
@base.scenario(admin_only=False, operator=False)
def mysql_tsl_check(self, *args, **kwargs):
status, result = utils.check_for_valid_testcase_input_file()
if status:
testcase_input_file = result
else:
return result
input_params = utils.get_input_params(
testcase_input_file, "mysql_tls_enablement_test")
test = mysql_tls_enablement_test()
result = test.perform_mysql_tls_enablement_test(input_params)
return result
# def mysql_db_check(self, *args, **kwargs):
# status, result = utils.check_for_valid_testcase_input_file()
# if status:
# testcase_input_file = result
# else:
# return result
# input_params = utils.get_input_params(
# testcase_input_file, "mysql_db_test")
# test = mysql_db_test()
# result = test.perform_mysql_db_test(input_params)
# print ("result from mysql_db_check")
# print (result)
# return result
def verbose(self, *args, **kwargs):
context = kwargs['context']
cpulse_id = kwargs['uuid']
cpulse = objects.Cpulse.get(context, cpulse_id)
result_string = cpulse['result']
final_string = ""
for line in result_string.split("\n"):
final_string += line.ljust(40)
result_final = json.loads(final_string)
result_final2 = {"verbose": result_final}
return result_final2
if __name__ == '__main__':
spt = security_pulse_scenario()
spt.password_encryption_check()

View File

@ -1,127 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ConfigParser
import os
import pwd
import stat
class tls_enable_check(object):
def __init__(self):
pass
def read_tls_config(self, config):
Result = {}
final_result = {}
overall_status = True
try:
config.get("ldap", "use_tls")
except ConfigParser.NoOptionError:
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': 'use_tls option is not enabled',
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
use_tls = config.get("ldap", "use_tls")
if use_tls == 'false':
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "use_tls option is enabled with 'false' value",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
elif use_tls == 'true':
ca_dir = None
try:
ca_dir = config.get("ldap", "tls_cacertdir")
except ConfigParser.NoOptionError:
try:
tls_ca_file = config.get("ldap", "tls_cacertfile")
ca_dir = tls_ca_file[:tls_ca_file.rindex('/')]
except ConfigParser.NoOptionError:
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "Both 'tls_ca_dir' and" +
" 'tls_ca_file' are not defined",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
if not ca_dir:
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "Both 'tls_ca_dir' and" +
" 'tls_ca_file' are not defined",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
for dirName, subdirList, fileList in os.walk(ca_dir):
os.chdir(dirName)
for f1 in fileList:
st = os.stat(f1)
user = pwd.getpwuid(st[stat.ST_UID])[0]
group = pwd.getpwuid(st[stat.ST_GID])[0]
if user != 'keystone' or group != 'keystone':
msg = "Certificate file directory " + \
" user/group permission are user=" + user \
+ ", group=" + group
overall_status = False
final_result.update(
{'OverallStatus': overall_status})
res = {
'Test Case Name': 'TLS',
'Message': msg,
'Status': 'Fail'}
Result.update(res)
final_result.update({'result': [Result]})
print (final_result)
return
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "TLS is enabled and the Certificate file" +
" permissions are 'keystone'",
'Status': 'Pass'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
if __name__ == '__main__':
tls_enable_check_obj = tls_enable_check()
config = ConfigParser.ConfigParser()
config.read("/etc/keystone/keystone.conf")
tls_enable_check_obj.read_tls_config(config)

View File

@ -1,140 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class SecurityFileCheck(object):
def perform_file_permission_check(self, input_params):
try:
print ("Executing the test ", input_params.get('testcase_name'))
final_result = []
final_status = []
final_msg = []
file_info_dir = input_params['global_data']['file_info_dir']
is_containerized = input_params['global_data']['is_containerized']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level" +
" or test case level")
msg = {'message': 'Perform on should be mentioned either at' +
' test level or test case level'}
return (404, json.dumps([msg]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
baseline_file = input_params['baseline_file']
flist = [base_dir +
"/scenario/plugins/security_pulse/testcase/" +
"remote_file_check.py",
base_dir + "/scenario/plugins/security_pulse/testcase/" +
"remote_filecredentials.py",
file_info_dir + "dir_list",
file_info_dir + "os_baseline"]
def ConsolidateResults(flist, container_name=None):
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"remote_file_check.py ",
file_list=flist, container_name=container_name)
Result = ans_runner.get_parsed_ansible_output(result)
final_status.append(Result[0])
final_result.extend(ast.literal_eval(Result[1]))
final_msg.extend(Result[2])
for p in perform_on:
for obj in os_hostobj_list:
ans_runner = ansible_runner([obj])
if obj.getRole() == p:
os_dir = input_params[p + '_dir']
all_baseline = ast.literal_eval(
open(baseline_file).read())
baseline = all_baseline[p]
open(
file_info_dir +
'os_baseline',
'w').write(
str(baseline))
# if container, make dir list and copy to container
if is_containerized:
for container, os_dir in os_dir.items():
self.createDirList(
os_dir,
file_info_dir)
ConsolidateResults(
flist,
container_name=container)
subprocess.call([
'rm',
file_info_dir +
'dir_list'])
else:
os_dir_list = []
[os_dir_list.extend(d) for d in os_dir.values()]
# os_dir = os_dir.values()
self.createDirList(os_dir_list, file_info_dir)
# flist.append("/tmp/sec_hc/dir_list")
ConsolidateResults(flist)
subprocess.call([
'rm', '-rf',
file_info_dir +
'os_baseline',
file_info_dir +
'output'])
subprocess.call([
'rm',
file_info_dir +
'dir_list'])
if 404 in final_status:
return (404, final_result, final_msg)
else:
return (200, final_result, final_msg)
except Exception as e:
print ("exception in perform_file_permission_check is--", e)
subprocess.call([
'rm', '-rf',
file_info_dir +
'os_baseline',
file_info_dir +
'output'])
subprocess.call([
'rm',
file_info_dir +
'dir_list'])
print (
"Exception occured in executing" +
" perform_file_permission_check")
message = {
'message': 'Test case execution failed due to some exception'}
return (404, json.dumps([message]), [])
def createDirList(self, os_dir, file_info_dir):
if os_dir is not None:
f = open(file_info_dir + 'dir_list', 'w+')
for dir_name in os_dir:
f.write(dir_name + '\n')
f.close()
if __name__ == '__main__':
sec = SecurityFileCheck()
sec.perform_file_permission_check()

View File

@ -1,68 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
from cloudpulse.operator.ansible import openstack_config_reader as os_cfg
import json
import os
import sys
class BaseLine(object):
def base_line(self, os_baseline_cfg):
try:
oscfg_reader = os_cfg.os_cfg_reader(os_baseline_cfg)
oscfg_reader.setOpenstackNodeIp()
oscfg_reader.printHostList()
openstack_host_list = oscfg_reader.get_host_list()
baseline_data = {}
for host in openstack_host_list:
f = open('/var/sec_hc/dir_list', 'w+')
for dir_name in host.getDirList():
f.write(dir_name + '\n')
f.close()
ans_runner = ansible_runner([host])
# execute_cmd
base_dir = os.path.dirname(cloudpulse.__file__)
base_dir += '/scenario/plugins/security_pulse/testcase'
flist = [base_dir + '/remote_baseline.py',
base_dir + '/remote_filecredentials.py',
'/var/sec_hc/dir_list'
]
results = ans_runner.execute_cmd(
"python " +
'/var/sec_hc/' +
"remote_baseline.py ",
file_list=flist)
# for node in results['contacted'].keys():
role = host.getRole()
node = host.getIp()
data = results['contacted'][node]['stdout']
baseline_data.update({role: ast.literal_eval(data)})
print (baseline_data)
formated_data = json.dumps(baseline_data, indent=4)
open('/var/sec_hc/os_allnode_baseline',
'w+').write(str(formated_data))
except Exception as e:
print (e)
if __name__ == '__main__':
os_cfg_file = sys.argv[1]
sec = BaseLine()
sec.base_line(os_cfg_file)

View File

@ -1,117 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
class keystone_admin_token_check(object):
def __init__(self):
pass
def keystone_admin_token_test(self):
ks_conf_file = "/etc/keystone/keystone.conf"
output = []
Result = {}
final_result = {}
overall_status = True
config = ConfigParser.ConfigParser()
if os.path.exists(ks_conf_file):
try:
config.read(ks_conf_file)
except Exception:
msg = {
'Test Case Name': 'Admin Token',
'Message': 'keystone.conf not found',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
try:
config.get("DEFAULT", "admin_token")
except ConfigParser.NoOptionError:
msg = {
'Test Case Name': 'Admin Token',
'Message': 'Admin Token is not defined',
'Status': 'Pass'}
Result.update(msg)
else:
msg = {
'Test Case Name': 'Admin Token',
'Message': 'Admin Token is defined',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
msg = {
'Test Case Name': 'Admin Token',
'Message': 'keystone.conf not found',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
output.append(Result)
Result = {}
ks_paste_conf_file = "/etc/keystone/keystone-paste.ini"
if os.path.exists(ks_paste_conf_file):
try:
config.read(ks_paste_conf_file)
except Exception:
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'keystone-paste.ini not found',
'Status': 'Pass'}
Result.update(msg)
else:
try:
config.get("filter:admin_token_auth",
"paste.filter_factory")
except (ConfigParser.NoOptionError,
ConfigParser.NoSectionError):
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token not defined',
'Status': 'Pass'}
Result.update(msg)
else:
option = config.get("filter:admin_token_auth",
"paste.filter_factory")
if "AdminTokenAuthMiddleware" in option:
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token defined',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token not defined',
'Status': 'Pass'}
Result.update(msg)
else:
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'keystone-paste.ini not found',
'Status': 'Pass'}
Result.update(msg)
output.append(Result)
final_result.update({'OverallStatus': overall_status})
final_result.update({'result': output})
print (final_result)
if __name__ == '__main__':
keystone_admin_token_check_obj = keystone_admin_token_check()
keystone_admin_token_check_obj.keystone_admin_token_test()

View File

@ -1,54 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class ks_admin_token_check(object):
def perform_ks_admin_token_check_test(self, input_params):
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
is_containerized = input_params['global_data']['is_containerized']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at \
test level or test case level")
msg = {
'message': 'Perform on should be mentioned either ' +
'at test level or test case level'}
return (404, json.dumps([msg]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir +
"/scenario/plugins/security_pulse/testcase/" +
"keystone_admin_token_check.py"]
ans_runner = ansible_runner(os_hostobj_list)
container_name = None
if is_containerized:
container_name = input_params['input']['container_name']
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"keystone_admin_token_check.py ",
file_list=flist, container_name=container_name)
Result = ans_runner.get_parsed_ansible_output(result)
subprocess.call(['rm', '-rf', file_info_dir + 'output'])
return Result

View File

@ -1,58 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class log_file_rotate_test(object):
def perform_log_file_rotate_test(self, input_params):
try:
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level \
or test case level")
message = {
'message': 'Perform on should be mentioned either at \
test level or test case level'}
return (404, json.dumps([message]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir +
"/scenario/plugins/security_pulse/testcase/" +
"remote_logrotate_check.py"]
ans_runner = ansible_runner(os_hostobj_list)
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"remote_logrotate_check.py ",
file_list=flist)
Result = ans_runner.get_parsed_ansible_output(result)
cmd = ['rm', '-rf', file_info_dir]
subprocess.call(cmd)
return Result
except Exception:
print (
"Exception occured in executing" +
" perform_log_file_rotate_test")
message = {
'message': 'Test case execution failed due to some exception'}
return (404, json.dumps([message]), [])

View File

@ -1,81 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class log_file_mode_check_test(object):
def perform_log_file_mode_test(self, input_params):
try:
print ("Executing the test ", input_params.get('testcase_name'))
final_result = []
final_status = []
final_msg = []
file_info_dir = input_params['global_data']['file_info_dir']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level \
or test case level")
message = {
'message': 'Perform on should be mentioned either at \
test level or test case level'}
return (404, json.dumps([message]), [])
os_hostobj_list = input_params['os_host_list']
conf_dir = input_params['conf_file_dir']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir +
"/scenario/plugins/security_pulse/testcase/" +
"remote_logmode_check.py",
file_info_dir + "dir_list"]
for p in perform_on:
for obj in os_hostobj_list:
ans_runner = ansible_runner([obj])
if obj.getRole() == p:
self.createDirList(conf_dir, file_info_dir)
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"remote_logmode_check.py ",
file_list=flist)
Result = ans_runner.get_parsed_ansible_output(result)
final_status.append(Result[0])
final_result.extend(ast.literal_eval(Result[1]))
final_msg.extend(Result[2])
cmd = ['rm', '-rf', file_info_dir + 'dir_list']
subprocess.call(cmd)
if 404 in final_status:
return (404, final_result, final_msg)
else:
return (200, final_result, final_msg)
except Exception:
print (
"Exception occured in executing" +
" perform_log_file_mode_test")
message = {
'message': 'Test case execution failed due to some exception'}
return (404, json.dumps([message]), [])
def createDirList(self, os_dir, file_info_dir):
if os_dir is not None:
f = open(file_info_dir + 'dir_list', 'w+')
for dir_name in os_dir:
f.write(dir_name + '\n')
f.close()

View File

@ -1,59 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class mysql_tls_enablement_test(object):
def perform_mysql_tls_enablement_test(self, input_params):
try:
file_info_dir = input_params['global_data']['file_info_dir']
is_containerized = input_params['global_data']['is_containerized']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level \
or test case level")
message = {
'message': 'Perform on should be mentioned either at \
test level or test case level'}
return (404, json.dumps([message]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir + "/scenario/plugins/security_pulse" +
"/testcase/remote_mysql_tls_enablement_check.py"]
ans_runner = ansible_runner(os_hostobj_list)
container_name = None
if is_containerized:
container_name = input_params['input']['container_name']
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"remote_mysql_tls_enablement_check.py ",
file_list=flist, container_name=container_name)
Result = ans_runner.get_parsed_ansible_output(result)
subprocess.call(['rm', '-rf', file_info_dir + 'output'])
return Result
except Exception as msg:
print (
"Exception while executing perform_mysql_tls_enablement_test")
print (msg)
message = {
'message': 'Test case execution failed due to some exception'}
return (404, json.dumps([message]), [])

View File

@ -1,58 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class password_encryption_check(object):
def perform_password_encryption_test(self, input_params):
try:
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level \
or test case level")
message = {
'message': 'Perform on should be mentioned either at \
test level or test case level'}
return (404, json.dumps([message]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir + "/scenario/plugins/security_pulse" +
"/testcase/remote_password_check.py"]
ans_runner = ansible_runner(os_hostobj_list)
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"remote_password_check.py ",
file_list=flist)
Result = ans_runner.get_parsed_ansible_output(result)
cmd = ['rm', '-rf', file_info_dir]
subprocess.call(cmd)
return Result
except Exception as e:
print (
"Exception occured in executing" +
" perform_password_encryption_test")
print (Exception, e)
message = {
'message': 'Test case execution failed due to some exception'}
return (404, json.dumps([message]), [])

View File

@ -1,63 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pwd
import remote_filecredentials as filecredentials
import stat
class FileTraversal(object):
def file_traversal(self, dir_list, file_dir):
try:
output = {}
for dir_name in dir_list:
self.rootDir = dir_name
for dirName, subdirList, fileList in os.walk(self.rootDir):
os.chdir(dirName)
for f1 in fileList:
st = os.stat(f1)
ins = filecredentials.AccessPreveliges(
f1, st[stat.ST_SIZE], oct(
stat.S_IMODE(
st[
stat.ST_MODE])), pwd.getpwuid(
st[stat.ST_UID]), pwd.getpwuid(
st[stat.ST_GID]))
output.update(
{
ins.getName(): {
'size': ins.getSize(),
'mode': ins.getMode(),
'user': ins.getUser(),
'group': ins.getGroup()}})
print (output)
except Exception as e:
print (e)
if __name__ == '__main__':
# LOG.info('Executing test')
file_dir = '/var/sec_hc/'
dirs = []
with open(file_dir + 'dir_list') as f:
dirs = f.read().splitlines()
sec = FileTraversal()
# LOG.info('Executing test1')
sec.file_traversal(dirs, file_dir)

View File

@ -1,129 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import os
import pwd
import remote_filecredentials as filecredentials
import stat
import string
class FileCheck(object):
def file_check(self, dir_list, file_dir):
try:
output = {}
result = []
final_result = {}
overall_status = True
for dir_name in dir_list:
self.rootDir = dir_name
for dirName, subdirList, fileList in os.walk(self.rootDir):
# flist = []
# for f in fileList:
# flist.append(os.path.abspath(os.path.join(dirName,f)))
os.chdir(dirName)
for f1 in fileList: # flist
st = os.stat(f1)
ins = filecredentials.AccessPreveliges(
f1, st[stat.ST_SIZE], oct(
stat.S_IMODE(
st[
stat.ST_MODE])), pwd.getpwuid(
st[stat.ST_UID]), pwd.getpwuid(
st[stat.ST_GID]))
output.update(
{
ins.getName(): {
'size': ins.getSize(),
'mode': ins.getMode(),
'user': ins.getUser(),
'group': ins.getGroup()}})
keystone_baseline = ast.literal_eval(
open(file_dir + 'os_baseline').read())
remote_mismatch = list(set(output.keys()).
difference(keystone_baseline.keys()))
baseline_mismatch = list(set(keystone_baseline.keys()).
difference(output.keys()))
for key in output.keys():
if key in keystone_baseline:
new = output.get(key)
base = keystone_baseline[key]
diffkeys = [k for k in base if base[k] != new[k]]
l = []
for k in diffkeys:
l.append(
'"' +
k +
'"' +
' is modified from ' +
base[k] +
' to ' +
new[k] +
' in remote')
msg = string.join(l, ', ')
if msg:
temp = {'test_case_name': key, 'Status': 'Fail'}
temp.update({'message': msg})
result.append(temp)
if baseline_mismatch:
for item in baseline_mismatch:
msg = 'File not found in remote'
temp = {'test_case_name': item, 'Status': 'Fail'}
temp.update({'message': msg})
result.append(temp)
if remote_mismatch:
for item in remote_mismatch:
msg = 'New file found in remote'
temp = {'test_case_name': item, 'Status': 'Fail'}
temp.update({'message': msg})
result.append(temp)
if not result:
overall_status = True
final_result.update(
{'OverallStatus': overall_status})
result = {}
result.update({'test_case_name': 'File permission Check'})
result.update({'status': 'Pass'})
result.update({'message': 'No mismatch'})
final_result.update({'result': [result]})
print (final_result)
return
else:
final_result.update(
{'OverallStatus': False})
final_result.update({'result': result})
print (final_result)
return
except Exception as e:
final_result.update(
{'OverallStatus': False})
result = {}
result.update({'test_case_name': 'File permission Check'})
result.update({'status': 'Fail'})
result.update(
{'message': 'Exception in file comparision' + str(e)})
final_result.update({'result': [result]})
print (final_result)
return
if __name__ == '__main__':
file_dir = '/var/sec_hc/'
dirs = []
with open(file_dir + 'dir_list') as f:
dirs = f.read().splitlines()
sec = FileCheck()
sec.file_check(dirs, file_dir)

View File

@ -1,39 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class AccessPreveliges(object):
def __init__(self, name=None, size=None, mode=None, user=None, group=None):
self.name = name
self.size = str(size)
self.mode = mode
self.user = user
self.group = group
def getName(self):
return self.name
def getSize(self):
return self.size
def getMode(self):
return self.mode
def getUser(self):
return self.user[0]
def getGroup(self):
return self.group[0]

View File

@ -1,124 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
class LogModeCheck(object):
def log_mode_check(self, dir_list):
try:
result = []
final_result = {}
overall_status = True
if dir_list is None or not dir_list:
res = {'message': 'Directory list is empty',
'status': 'Fail', 'test_case_name': 'Log Mode Check'}
result.append(res)
final_result.update(
{'OverallStatus': False})
final_result.update({'result': result})
print(final_result)
return
config = ConfigParser.ConfigParser()
for dir_name in dir_list:
for dirName, subdirList, files in os.walk(dir_name):
files = [file for file in files if file.
endswith(('.conf', '.ini'))]
for f1 in files:
debug_msg = {}
verbose_msg = {}
abspath = ''
if dirName.endswith("/"):
abspath = dirName + f1
else:
abspath = dirName + "/" + f1
case_name = "Debug Mode check for '" + abspath + "'"
debug_msg.update(
{'test_case_name': case_name})
verbose_msg.update({'test_case_name': "Verbose Mode" +
" check for '" + abspath + "'"})
try:
config.read(abspath)
except Exception:
continue
try:
config.get("DEFAULT", "debug")
except ConfigParser.NoOptionError as e:
msg = 'Debug option is not enabled'
debug_msg.update(
{'message': msg})
debug_msg.update({'status': 'Pass'})
else:
debug = config.get("DEFAULT", "debug")
if debug.lower() == 'false':
msg = "Debug option is enabled with 'false'"
debug_msg.update(
{'message': msg})
debug_msg.update({'status': 'Pass'})
else:
msg = 'Debug option is enabled'
debug_msg.update(
{'message': msg})
debug_msg.update({'status': 'Fail'})
overall_status = False
result.append(debug_msg)
try:
config.get("DEFAULT", "verbose")
except ConfigParser.NoOptionError:
msg = 'Verbose option is not enabled'
verbose_msg.update(
{'message': msg})
verbose_msg.update({'status': 'Pass'})
else:
verbose = config.get("DEFAULT", "verbose")
if verbose.lower() == 'false':
msg = "Verbose option is enabled with 'false'"
verbose_msg.update(
{'message': msg})
verbose_msg.update({'status': 'Pass'})
else:
msg = 'Verbose option is enabled'
verbose_msg.update(
{'message': msg})
verbose_msg.update({'status': 'Fail'})
overall_status = False
result.append(verbose_msg)
final_result.update(
{'OverallStatus': overall_status})
final_result.update({'result': result})
print(final_result)
return
except Exception as e:
final_result.update(
{'OverallStatus': False})
result = {}
result.update({'test_case_name': 'Log Mode Check'})
result.update({'status': 'Fail'})
result.update(
{'message': 'Exception in log mode check' + str(e)})
final_result.update({'result': [result]})
print(final_result)
return
if __name__ == '__main__':
file_dir = '/var/sec_hc/'
dirs = []
with open(file_dir + 'dir_list') as f:
dirs = f.read().splitlines()
sec = LogModeCheck()
sec.log_mode_check(dirs)

View File

@ -1,111 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import subprocess
os_log_cfg_file = {'ceilometer': 'openstack-ceilometer',
'cinder': 'openstack-cinder',
'httpd': 'openstack-dashboard',
'glance': 'openstack-glance',
'keystone': 'openstack-keystone',
'neutron': 'openstack-neutron',
'nova': 'openstack-nova',
'swift': 'openstack-swift',
'rabbitmq': 'rabbitmq-server',
'mariadb': 'mariadb',
'mongodb': 'mongodb',
'heat': 'openstack-heat'}
os_service_name = ['nova', 'cinder', 'httpd', 'glance', 'keystone',
'neutron', 'ceilometer', 'swift', 'rabbitmq',
'mariadb', 'mongodb', 'heat']
logrotaion_dir = "/etc/logrotate.d/"
class LogRotateCheck(object):
def find_openstack_service(self):
output = subprocess.check_output('systemctl | grep -i -e "mongo" \
-e "maria" -e "httpd" -e "rabbit" \
-e "openstack" | tr -s " " \
| cut -d" " -f1', shell=True)
running_service = output.split('\n')
running_service.append('keystone')
service_list = []
for service_name in os_service_name:
r = [s for s in running_service if service_name in s]
if r:
service_list.append(service_name)
return service_list
def log_rotate_check(self):
try:
result = []
final_result = {}
tmp = {}
overall_status = True
service_list = self.find_openstack_service()
for service_name in service_list:
cfg_file = os_log_cfg_file[service_name]
if os.path.exists(logrotaion_dir + cfg_file):
cfg_lines = open(logrotaion_dir + cfg_file, "r").read().\
splitlines()
case_name = 'Log Rotate Check for ' + cfg_file
for line in cfg_lines:
if "/var/log" in line and not line.startswith('#'):
tmp[cfg_file] = "Config Exists"
if cfg_file not in tmp:
overall_status = False
res = {'status': 'Fail'}
res.update(
{'test_case_name': case_name})
res.update({'message': 'No Log Rotation Config Found'})
result.append(res)
else:
res = {'status': 'Pass'}
res.update(
{'test_case_name': case_name})
res.update({'message': 'Log Rotation Config Found'})
result.append(res)
else:
# tmp[cfg_file] = "Log file doesn't exists"
overall_status = False
res = {'status': 'Fail'}
res.update(
{'test_case_name': case_name})
res.update({'message': "Log file doesnot exists"})
result.append(res)
final_result.update(
{'OverallStatus': overall_status})
final_result.update({'result': result})
print(final_result)
return
except Exception:
final_result.update(
{'OverallStatus': False})
result = {}
result.update({'test_case_name': 'Log Rotate Check'})
result.update({'status': 'Fail'})
result.update(
{'message': 'Exception in log rotate check'})
final_result.update({'result': [result]})
print(final_result)
return
if __name__ == '__main__':
lrc = LogRotateCheck()
lrc.log_rotate_check()

View File

@ -1,152 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ConfigParser
import os
from pwd import getpwuid
import string
class mysql_tls_enable_check(object):
def __init__(self):
self.ssl_status = False
self.SSL_file = '/etc/my.cnf.d/server.cnf'
self.SSLOwner = 'mysql'
self.config = ConfigParser.ConfigParser(allow_no_value=True)
def getSSLStatus(self):
ssl_files = []
Result = {}
final_result = {}
overall_status = True
if os.path.exists(self.SSL_file):
try:
self.config.read(self.SSL_file)
self.ssl_status = self.config.get('mysqld', 'ssl')
if self.ssl_status in ['true', 'True']:
ssl_files.append(self.config.get('mysqld', 'ssl-ca'))
ssl_files.append(self.config.get('mysqld', 'ssl-cert'))
ssl_files.append(self.config.get('mysqld', 'ssl-key'))
file_objs = self.getFileInfo(ssl_files)
if file_objs:
op = self.checkFilePermission(file_objs)
if op:
overall_status = False
final_result.update(
{'OverallStatus': overall_status})
Result.update(
{
'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in mysql with \
following mismatch - ' +
string.join(
op,
', '),
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = True
final_result.update(
{'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in \
mysql',
'Status': 'Pass'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update(
{
'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in mysql and not \
able to check the file permission of \
SSL files',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'SSL is not enabled in mysql',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
except Exception:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'Exception while \
reading ' + self.SSL_file,
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': self.SSL_file + ' not found',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
def formfileObj(self, file_name, stat_file_obj):
file_info = {}
try:
file_info['owner'] = getpwuid(stat_file_obj.st_uid).pw_name
file_info['group_owner'] = getpwuid(stat_file_obj.st_gid).pw_name
except Exception:
pass
return file_info
def getFileInfo(self, files=[]):
file_objs = {}
try:
for f in files:
obj = os.stat(f)
opt = self.formfileObj(f, obj)
file_objs.update({f: opt})
except Exception:
pass
return file_objs
def checkFilePermission(self, file_objs={}):
result = []
for file, obj in file_objs.items():
if obj.get('owner') != self.SSLOwner:
msg = 'File "%s" owner permission is not matching' % (file)
result.append(msg)
if obj.get('group_owner') != self.SSLOwner:
msg = 'File "%s" group owner permision is not matching' % (
file)
result.append(msg)
return result
if __name__ == '__main__':
checkssl = mysql_tls_enable_check()
checkssl.getSSLStatus()

View File

@ -1,103 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
class password_encryption_test(object):
def password_encryption_check(self, config):
overall_status = True
final_result = {}
Result = {}
try:
config.get("token", "hash_algorithm")
except ConfigParser.NoOptionError:
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'test_case_name': 'Hash Algorithm',
'message': 'Hash Algorithm option is commented',
'status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
algo = config.get("token", "hash_algorithm")
if algo == "sha1" or algo == "sha256":
try:
config.get("token", "provider")
except ConfigParser.NoOptionError:
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'test_case_name': 'Provider option',
'message': 'Provider option is not enabled',
'status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
provider = config.get("token", "provider")
if provider == "pki":
final_result.update({'OverallStatus': overall_status})
msg = {
'test_case_name': 'Provider option',
'message': "hash algorithm option enabled " +
" with value " +
algo +
" and provider " +
"using 'pki' ",
'status': 'Pass'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
elif provider == "uuid":
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'test_case_name': 'Provider option',
'message': "hash algorithm option enabled " +
" with value " +
algo +
" and provider " +
"using 'uuid' ",
'status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
elif algo == "md5":
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'test_case_name': 'Provider option',
'message': "hash algorithm option enabled " +
" with value " +
algo,
'status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
if __name__ == '__main__':
pet = password_encryption_test()
config = ConfigParser.ConfigParser()
config.read("/etc/keystone/keystone.conf")
pet.password_encryption_check(config)

View File

@ -1,52 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
import json
import os
import subprocess
class tls_enablement_test(object):
def perform_tls_enablement_test(self, input_params):
print ("Executing the test ", input_params.get('testcase_name'))
file_info_dir = input_params['global_data']['file_info_dir']
is_containerized = input_params['global_data']['is_containerized']
perform_on = input_params['perform_on']
if perform_on is None or not perform_on:
print ("Perform on should be mentioned either at test level " +
"or test case level")
msg = {
'message': 'Perform on should be mentioned either at test ' +
'level or test case level'}
return (404, json.dumps([msg]), [])
os_hostobj_list = input_params['os_host_list']
base_dir = os.path.dirname(cloudpulse.__file__)
flist = [base_dir + "/scenario/plugins/security_pulse" +
"/testcase/TLS_Enablement_Check.py"]
ans_runner = ansible_runner(os_hostobj_list)
container_name = None
if is_containerized:
container_name = input_params['input']['container_name']
result = ans_runner.execute_cmd(
"python " +
file_info_dir +
"TLS_Enablement_Check.py ",
file_list=flist, container_name=container_name)
Result = ans_runner.get_parsed_ansible_output(result)
subprocess.call(['rm', '-rf', file_info_dir + 'output'])
return Result

View File

@ -1,142 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudpulse.scenario.plugins.security_pulse.util.security_test_data \
import security_test
from cloudpulse.scenario.plugins.security_pulse.util.security_testcase_data \
import security_testcase
import yaml
class security_test_input_reader(object):
def __init__(self, fileName):
self.secInputYamlObj = None
self.security_tests = []
try:
fp = open(fileName)
except IOError as e:
print ("Error while opening the file...%s", e)
return
try:
self.secInputYamlObj = yaml.safe_load(fp)
except yaml.error.YAMLError as perr:
print ("Error while parsing...%s", perr)
return
def process_security_input_file(self):
# print self.secInputYamlObj
secTests = self.secInputYamlObj["securityhealth"]
globalVarData = {}
input_data = {}
sec_test_lst = []
for test_key in secTests.keys():
if test_key == "global_data":
for gkey in secTests[test_key].keys():
globalVarData[gkey] = secTests[test_key][gkey]
continue
sec_test_obj = security_test()
sec_test_obj.set_test_name(test_key)
sec_test_case_lst = []
test_data = secTests[test_key]
for test_case_key in test_data.keys():
if test_case_key == "perform_on":
sec_test_obj.set_perform_on(secTests[test_key]
[test_case_key])
elif test_case_key == "testcase":
sec_test_obj.set_test_to_execute(secTests[test_key]
[test_case_key])
else:
security_testcase_obj = security_testcase()
security_testcase_obj.set_test_name(test_case_key)
if "perform_on" in secTests[test_key][test_case_key]:
# print secTests[test_key][test_case_key]["perform_on"]
security_testcase_obj.\
set_perform_on(secTests[test_key]
[test_case_key]
["perform_on"])
test_input_dict = {}
if "input" in secTests[test_key][test_case_key]:
if secTests[test_key][test_case_key]["input"] \
is not None:
for test_case_input_key in \
secTests[test_key][test_case_key]["input"].\
keys():
test_input_dict[test_case_input_key] = \
(secTests[test_key][test_case_key]["input"]
[test_case_input_key])
security_testcase_obj.\
set_input_params(test_input_dict)
sec_test_case_lst.append(security_testcase_obj)
else:
sec_test_case_lst = sec_test_case_lst + \
self.process_testcase_input(test_key,
test_case_key,
secTests)
sec_test_obj.set_security_testcase(sec_test_case_lst)
sec_test_lst.append(sec_test_obj)
# security_test_input_reader.print_test_input(sec_test_lst)
# print globalVarData
input_data['global_data'] = globalVarData
input_data['sec_test_lst'] = sec_test_lst
return input_data
def process_testcase_input(self, test_key, test_case_key, secTests):
sec_test_case_lst = []
# print secTests[test_key][test_case_key]
for sub_test_case_key in (secTests[test_key]
[test_case_key]).keys():
security_testcase_obj = security_testcase()
security_testcase_obj.set_test_name(test_case_key + "." +
sub_test_case_key)
if "perform_on" in (secTests[test_key][test_case_key]
[sub_test_case_key]):
security_testcase_obj.\
set_perform_on(secTests[test_key][test_case_key]
[sub_test_case_key]["perform_on"])
if "input" in secTests[test_key][test_case_key][sub_test_case_key] \
and (secTests[test_key][test_case_key][sub_test_case_key]
["input"]) is not None:
test_input_dict = {}
for test_case_input_key in \
(secTests[test_key][test_case_key]
[sub_test_case_key]["input"]).keys():
test_input_dict[test_case_input_key] = \
(secTests[test_key][test_case_key]
[sub_test_case_key]["input"][test_case_input_key])
security_testcase_obj.set_input_params(test_input_dict)
sec_test_case_lst.append(security_testcase_obj)
return sec_test_case_lst
"""
@staticmethod
def print_test_input(sec_test_lst):
for test_obj in sec_test_lst:
print "TestName : %s " % test_obj.get_test_name()
print "Perform On : %s " % test_obj.get_perform_on()
print "Test to execute : %s " % test_obj.get_test_to_execute()
for test_case_obj in test_obj.get_security_testcase():
print " Test case Name : %s " % test_case_obj.\
get_test_name()
print " Perform On : %s " % test_case_obj.\
get_perform_on()
print " Input Params : %s " % test_case_obj.\
get_input_params()
"""
if __name__ == '__main__':
yhp = security_test_input_reader()
yhp.process_security_input_file()

View File

@ -1,103 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
import os
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file'),
cfg.StrOpt('testcase_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='setup file for security pulse test case'),
]
CONF = cfg.CONF
security_pulse_test_group = cfg.OptGroup(name='security_pulse_test',
title='Security pulse test' +
' param input file')
CONF.register_group(security_pulse_test_group)
CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
def get_test_input_by_name(testcase_name, input_data):
sec_test_lst = input_data['sec_test_lst']
for test_obj in sec_test_lst:
for test_case_obj in test_obj.get_security_testcase():
if testcase_name == test_case_obj.get_test_name():
input_params = test_case_obj.get_input_params()
input_params['testcase_name'] = testcase_name
if test_case_obj.get_perform_on() is not None:
input_params['perform_on'] = \
test_case_obj.get_perform_on()
else:
input_params['perform_on'] = test_obj.get_perform_on()
input_params['test_name'] = test_obj.get_test_name()
input_params['global_data'] = input_data['global_data']
return input_params
return None
def get_all_openstack_node_list():
openstack_node_list = []
os_node_info_obj = openstack_node_info_reader(
cfg.CONF.security_pulse_test.testcase_setup_file)
openstack_node_list = os_node_info_obj.get_host_list()
return openstack_node_list
def get_input_params(
test_case_input_conf_file=None,
test_input_file=None):
input_params = {}
try:
if test_case_input_conf_file:
input_reader = security_test_input_reader(
test_case_input_conf_file)
input_data = input_reader.process_security_input_file()
input_params = get_test_input_by_name(test_input_file, input_data)
except Exception:
pass
openstack_node_list = get_all_openstack_node_list()
input_params['os_host_list'] = openstack_node_list
return input_params
def check_for_valid_testcase_input_file():
"""Check for valid test case input yaml file
if testcase i/p yaml file is not present return exception msg
else return input yaml file name.
"""
testcase_input_file = ""
try:
testcase_input_file =\
cfg.CONF.security_pulse_test.testcase_input_file
except Exception:
msg = "Exception while reading the testcase input file"
print (msg)
return False, (404, msg, [])
if not os.path.isfile(testcase_input_file):
msg = 'Testcase input file %s not found' % (testcase_input_file)
print (msg)
return False, (404, msg, [])
return True, testcase_input_file

View File

@ -1,47 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class security_test(object):
def __init__(self):
self.test_name = None
self.security_testcase = []
self.perform_on = []
self.test_to_execute = []
def get_test_name(self):
return self.test_name
def get_security_testcase(self):
return self.security_testcase
def set_test_name(self, test_name):
self.test_name = test_name
def set_security_testcase(self, security_testcase):
self.security_testcase = security_testcase
def get_perform_on(self):
return self.perform_on
def set_perform_on(self, perform_on):
self.perform_on = perform_on
def get_test_to_execute(self):
return self.test_to_execute
def set_test_to_execute(self, test_to_execute):
self.test_to_execute = test_to_execute

View File

@ -1,40 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class security_testcase(object):
def __init__(self):
self.test_name = None
self.perform_on = []
self.input_params = {}
def get_test_name(self):
return self.test_name
def set_test_name(self, test_name):
self.test_name = test_name
def get_perform_on(self):
return self.perform_on
def set_perform_on(self, perform_on):
self.perform_on = perform_on
def get_input_params(self):
return self.input_params
def set_input_params(self, input_params):
self.input_params = input_params

View File

@ -1,430 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ansible.runner
import os
import sys
import test_config as config
import test_utils as utils
import time
import unittest
import uuid
class node_info_obj(object):
def __init__(self, host, ip, user, role, name):
self.host = host
self.ip = ip
self.user = user
self.role = role
self.name = name
def getHost(self):
return self.host
def getIp(self):
return self.ip
def getUser(self):
return self.user
def getRole(self):
return self.role
def getName(self):
return self.name
class node_info_reader(object):
def __init__(self, host_file=None):
if host_file is None:
print ("Host file not passed. exit")
sys.exit(0)
self.host_file = utils.get_absolute_path_for_file(__file__, host_file)
if not os.path.exists(self.host_file):
print ("%s file does not exist" % self.host_file)
return
self.parsed_data = utils.create_parsed_yaml(self.host_file)
def get_host_list(self):
host_ip_list = []
for key, data in self.parsed_data.items():
hostname = key
name = key
ip = data.get('ip')
user = data.get('user')
role = data.get('role')
node = node_info_obj(hostname, ip, user, role, name)
host_ip_list.append(node)
return host_ip_list
class AnsibleRunner(object):
def __init__(self,
host_list=None,
remote_user=None,
sudo=True):
# AnsibleRunner init.
self.host_list = host_list
self.sudo = sudo
def get_validated_data(self, results, stdout=True, stderr=False):
# print ("\n\nInside get_validated_data", results)
output = ''
###################################################
# First validation is to make sure connectivity to
# all the hosts was ok.
###################################################
if results['dark']:
output = ''
##################################################
# Now look for status 'failed'
##################################################
for node in results['contacted'].keys():
if 'failed' in results['contacted'][node]:
if results['contacted'][node]['failed'] is True:
output = ''
#################################################
# Check for the return code 'rc' for each host.
#################################################
for node in results['contacted'].keys():
info = results['contacted'][node]
if stdout:
op = info.get('stdout')
else:
op = info.get('stderr')
output = op
return output
def ansible_perform_operation(self,
host_list=None,
remote_user=None,
module=None,
complex_args=None,
module_args='',
environment=None,
check=False,
forks=2,
stderr=None,
stdout=None):
# Perform any ansible operation.
runner = ansible.runner.Runner(
module_name=module,
host_list=host_list,
remote_user=remote_user,
module_args=module_args,
complex_args=complex_args,
environment=environment,
check=check,
forks=forks)
results = runner.run()
res = self.get_validated_data(results, stdout, stderr)
return res
class FunctionalTestMethods(unittest.TestCase):
ansirunner = AnsibleRunner()
config = config.Configs()
node_config_file_name = config.node_config_file
node_reader = node_info_reader(node_config_file_name)
node_list = node_reader.get_host_list()
env_value = config.env_value
TEST_CASE_NAME = config.test_case_name
INVALID_TEST_CASE_NAME = config.invalid_test_case_name
TEST_CASES = config.all_test_cases
# expected_testcase_run = config.expected_testcase_run
# sleep_interval = config.sleep_interval
# input_periodic_test = config.input_periodic_test
# container_name = config.container_name
# update_script_file = config.update_script
# revert_script_file = config.revert_script
# tmp_loc = config.tmp_loc
# conf_file_path = config.conf_file_path
# endpoint testcase validation check
# services_to_check = config.services_to_check
# endpoint_testcase = config.endpoint_testcase
@classmethod
def setUpClass(cls):
# set_env_variables(cls.env_value)
pass
@classmethod
def tearDownClass(cls):
pass
def get_node_details(self, node_info):
self.host_list = node_info.getIp()
self.remote_user = node_info.getUser()
self.host = node_info.getHost()
self.role = node_info.getRole()
def reset_node_details(self):
self.host_list = None
self.remote_user = None
self.host = None
self.role = None
# Check for test run with invalid test case name with credentials
def test_invalid_test_case_run(self):
opt = utils.form_cli_env_params(self.env_value)
cmd = "%s run %s" % (opt, self.INVALID_TEST_CASE_NAME)
run_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=run_cmd,
stdout=False,
stderr=True)
check = self.INVALID_TEST_CASE_NAME + ' is invalid'
self.assertIn(check, res)
self.reset_node_details()
# Check for test run with valid test case name with credentials
def test_valid_test_case_run_and_delete(self):
opt = utils.form_cli_env_params(self.env_value)
cmd = "%s run %s" % (opt, self.TEST_CASE_NAME)
run_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=run_cmd,
stdout=True,
stderr=False)
result = utils.parse_run_cmd_result(res)
self.assertIn('Pass', result)
# Try deleting the test case
del_uuid = utils.get_uuid(res)
cmd = "%s delete %s" % (opt, del_uuid)
delete_cmd = utils.form_cmd(cmd)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=delete_cmd,
stdout=True,
stderr=False)
self.assertIn('', res)
self.reset_node_details()
# Check for test run with invalid test case name with env variable
def test_invalid_test_case_run_with_env(self):
cmd = "run %s" % (self.INVALID_TEST_CASE_NAME)
run_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=run_cmd,
environment=self.env_value,
stdout=False,
stderr=True)
check = self.INVALID_TEST_CASE_NAME + ' is invalid'
self.assertIn(check, res)
self.reset_node_details()
# Check for run & show command using credentials
def test_run_and_show_result(self):
opt = utils.form_cli_env_params(self.env_value)
cmd = "%s run %s" % (opt, self.TEST_CASE_NAME)
run_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=run_cmd,
stdout=True,
stderr=False)
result = utils.parse_run_cmd_result(res)
self.assertIn('Pass', result)
if res:
# Wait till the run completes and then execute show cmd
time.sleep(5)
uuid = utils.get_uuid(res)
cmd = "%s show %s" % (opt, uuid)
show_cmd = utils.form_cmd(cmd)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=show_cmd,
stdout=True,
stderr=False)
result = utils.parse_show_cmd_result(res, uuid)
self.assertEqual('Pass', result)
self.reset_node_details()
# Check for run & show command using credentials
def test_multiple_run_and_show(self):
opt = utils.form_cli_env_params(self.env_value)
for case in self.TEST_CASES:
cmd = "%s run %s" % (opt, case)
run_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=run_cmd,
stdout=True,
stderr=False)
if res:
uuid = utils.get_uuid(res)
cmd = "%s show %s" % (opt, uuid)
show_cmd = utils.form_cmd(cmd)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=show_cmd,
stdout=True,
stderr=False)
# Wait till the run completes and then execute show cmd
time.sleep(5)
result = utils.parse_show_cmd_result(res, uuid)
self.assertEqual('Pass', result)
self.reset_node_details()
# Check for result command is working with credentials
def test_result_cmd(self):
opt = utils.form_cli_env_params(self.env_value)
# result_cmd = "cloudpulse %s result"%(opt)
cmd = "%s result" % (opt)
result_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=result_cmd,
stdout=True,
stderr=False)
result = utils.parse_result_cmd_result(res)
self.assertIn('Pass', result)
self.reset_node_details()
# Check for delete command with invalid uuid is working with credentials
def test_delete_cmd_with_invalid_uuid(self):
opt = utils.form_cli_env_params(self.env_value)
del_uuid = str(uuid.uuid4())
cmd = "%s delete %s" % (opt, del_uuid)
delete_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=delete_cmd,
stdout=True,
stderr=False)
check = 'Test %s could not be found' % (del_uuid)
self.assertIn(check, res)
self.reset_node_details()
def test_with_invalid_user(self):
opt = utils.form_cli_env_params(self.env_value, invalid_uname=True)
cmd = '%s result ' % (opt)
result_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=result_cmd,
stdout=False,
stderr=True)
check = 'Could not find user:'
self.assertIn(check, res)
self.reset_node_details()
def test_with_invalid_password(self):
opt = utils.form_cli_env_params(self.env_value, invalid_pwd=True)
cmd = '--debug %s result ' % (opt)
result_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
pwd_test = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=result_cmd,
stdout=False,
stderr=True)
check = 'Invalid user / password'
self.assertIn(check, pwd_test)
self.reset_node_details()
def test_with_invalid_tenant(self):
opt = utils.form_cli_env_params(self.env_value, invalid_tenant=True)
cmd = '--debug %s result ' % (opt)
result_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
tenant_test = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=result_cmd,
stdout=False,
stderr=True)
check = 'Could not find project'
self.assertIn(check, tenant_test)
self.reset_node_details()
def test_with_invalid_auth(self):
opt = utils.form_cli_env_params(self.env_value, invalid_auth=True)
cmd = '--debug %s result ' % (opt)
result_cmd = utils.form_cmd(cmd)
for node in self.node_list:
self.get_node_details(node)
res = self.ansirunner.ansible_perform_operation(
host_list=[self.host_list],
remote_user=self.remote_user,
module="shell",
module_args=result_cmd,
stdout=False,
stderr=True)
check = 'Authorization Failed'
self.assertIn(check, res)
self.reset_node_details()
if __name__ == '__main__':
unittest.main()