Adding security pulse testcases

Change-Id: I55597ec84868d1263b0db934802b1c70ad7b9374
This commit is contained in:
Ramaraja 2015-10-19 08:52:06 -07:00 committed by Anand Shanmugam
parent f992c54b81
commit fe9a473d51
17 changed files with 966 additions and 68 deletions

View File

@ -0,0 +1,53 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class HostObject(object):
def __init__(self, host, ip, user, role, name, isNagios=False):
self.host = host
self.ip = ip
self.user = user
self.isNagios = isNagios
self.role = role
self.name = name
self.dirlist = []
def setNagios(self, isNagios):
self.isNagios = isNagios
def isNagiosRunning(self):
return self.isNagios
def getHost(self):
return self.host
def getIp(self):
return self.ip
def getUser(self):
return self.user
def getRole(self):
return self.role
def getName(self):
return self.name
def setDirList(self, dir_list):
self.dirlist = dir_list
def getDirList(self):
return self.dirlist

View File

@ -0,0 +1,102 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from hostObj import HostObject
import os
import yaml
class os_cfg_reader(object):
def __init__(self, hostFileName="openstack_config.yaml"):
abs_path = os.getcwd() + os.sep + 'config/%s' % hostFileName
self.hostYamlObj = None
self.openstack_host_list = []
try:
fp = open(abs_path)
except IOError as e:
print ("Error while opening the file...%s" % e)
return
try:
self.hostYamlObj = yaml.load(fp)
# print "self.hostYamlObj: ", self.hostYamlObj,
# dir(self.hostYamlObj)
except yaml.error.YAMLError as perr:
print ("Error while parsing...%s" % perr)
return
def setOpenstackNodeIp(self):
# print self.hostYamlObj
for key in self.hostYamlObj.keys():
name = key
ip = self.hostYamlObj[key]["ip"]
hostname = key
username = self.hostYamlObj[key]["user"]
role = self.hostYamlObj[key]["role"]
hstObj = HostObject(
hostname,
ip,
username,
role,
name,
False)
if "dirlist" in self.hostYamlObj[key]:
dirList = self.hostYamlObj[key]["dirlist"]
hstObj.setDirList(dirList)
self.openstack_host_list.append(hstObj)
def get_host_list(self):
return self.openstack_host_list
def printHostList(self):
for hostObj in self.openstack_host_list:
print ("IP - %s" % (hostObj.getIp()))
print ("HOST - %s" % (hostObj.getHost()))
print ("USER - %s" % (hostObj.getUser()))
print ("NAGIOS RUNNING - %s" % (str(hostObj.isNagiosRunning())))
def generate_ansible_config(self, os_obj_list):
f = open('/tmp/sec_hc/ansible_hosts', 'w+')
for obj in os_obj_list:
# print obj.getName()
f.write('[' + obj.getName() + ']\n')
f.write(
obj.getIp() +
'\t\t' +
'ansible_ssh_user=' +
obj.getUser() +
'\t\tansible_ssh_pass=' +
obj.getPassword())
f.write('\n')
f.close()
"""
def update_ansible_playbook(self):
f = open('testcase-configs/ansible-playbook.yaml')
f1 = open('testcase-configs/ansible-playbook_update.yaml', "w")
for line in f:
if 'hosts' in line:
f1.write('- hosts: sasi1\n')
else:
f1.write(line)
f.close()
f1.close()
"""
if __name__ == '__main__':
yhp = os_cfg_reader()
yhp.setOpenstackNodeIp()
yhp.printHostList()
# yhp.generate_ansible_config(yhp.get_host_list())
yhp.update_ansible_playbook()

View File

@ -15,10 +15,12 @@
class openstack_node_obj(object):
def __init__(self, host, ip, user, role, name):
self.host = host
self.ip = ip
self.user = user
# self.password = password
self.role = role
self.name = name
@ -31,6 +33,9 @@ class openstack_node_obj(object):
def getUser(self):
return self.user
# def getPassword(self):
# return self.password
def getRole(self):
return self.role

View File

@ -40,6 +40,7 @@ class openstack_node_info_reader(object):
ip = self.hostYamlObj[key]["ip"]
hostname = key
username = self.hostYamlObj[key]["user"]
# password = self.hostYamlObj[key]["password"]
role = self.hostYamlObj[key]["role"]
node_obj = openstack_node_obj(hostname, ip, username,
role, name)

View File

@ -0,0 +1,24 @@
# control-1:
# ip: 172.31.231.14
# user: root
# password: cisco123
# role: controller
# dirlist: [/etc/keystone,/etc/nova,/etc/neutron]
# compute-1:
# ip: 172.31.231.15
# user: root
# password: cisco123
# role: compute
# dirlist: [/etc/nova,/etc/neutron]
control-1:
ip: 172.29.74.98
user: ubuntu
password: CTO1234!
role: controller
dirlist: [/tmp/keystone,/tmp/nova,/tmp/neutron]
control-2:
ip: 172.31.231.59
user: root
password: cisco123
role: controller
dirlist: [/etc/my.cnf,/etc/my.cnf.d/,/var/lib/mysql/,/var/log/mariadb/mariadb.log,/var/run/mariadb/mariadb.pid]

View File

@ -9,6 +9,7 @@ securityhealth:
input:
conf_file: [/etc/keystone/keystone.conf]
filepermission:
perform_on: [controller]
input:
baseline_file: /tmp/sec_hc/os_allnode_baseline
controller_dir: [/etc/keystone,/etc/nova,/etc/neutron]
@ -52,3 +53,9 @@ securityhealth:
ServerTokens: Prod
ServerSignature: off
TraceEnable: off
mysql:
perform_on: [controller]
testcase: [mysql_tls_enablement_test,mysql_filecheck_test]
mysql_tls_enablement_test:
perform_on: [controller]
input:

View File

@ -14,7 +14,7 @@
# under the License.
from __future__ import print_function
import cloudpulse
# import cloudpulse
# from cloudpulse.operator.ansible.openstack_node import openstack_node_obj
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
@ -27,13 +27,17 @@ from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
from cloudpulse.scenario.plugins.security_pulse.util import \
security_pulse_test_util
import json
import os
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file')
help='Security testcase input file'),
cfg.StrOpt('testcase_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='setup file for security pulse test case'),
]
CONF = cfg.CONF
@ -54,32 +58,27 @@ class security_common_test(base.Scenario):
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, e.message, [])
return (404, json.dumps([{'Message': e.message}]), [])
if not os.path.isfile(testcase_input_file):
print ("Security Testcase input file not found")
return (404, "Security Testcase input file not found", [])
# print testcase_input_file
base_dir = os.path.dirname(cloudpulse.__file__)
msg = {'Message': "Security Testcase input file not found"}
return (404, json.dumps([msg]), [])
# base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("tls_enablement_check", input_data)
os_node_info_obj = \
openstack_node_info_reader(base_dir +
"/scenario/plugins/security_pulse/" +
"config/openstack_config.yaml")
# os_node_info_obj = \
# openstack_node_info_reader(base_dir +
# "/scenario/plugins/security_pulse/" +
# "config/openstack_config.yaml")
os_node_info_obj = openstack_node_info_reader(
cfg.CONF.security_pulse_test.testcase_setup_file)
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
# print input_params
tls_test = tls_enablement_test()
result = tls_test.perform_tls_enablement_test(input_params)
if not result:
return (404, "No result from test execution", [])
# print result
if result.startswith("Fail"):
return (404, result, [])
else:
return (200, result, [])
return result
def security_keystone_admin_token_check(self, *args, **kwargs):
testcase_input_file = ""
@ -88,38 +87,26 @@ class security_common_test(base.Scenario):
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, e.message, [])
return (404, json.dumps([{'Message': e.message}]), [])
if not os.path.isfile(testcase_input_file):
return (404, "Security Testcase input file not found", [])
base_dir = os.path.dirname(cloudpulse.__file__)
msg = {'Message': "Security Testcase input file not found"}
return (404, json.dumps([msg]), [])
# base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("ks_admin_token_check", input_data)
os_node_info_obj = \
openstack_node_info_reader(base_dir +
"/scenario/plugins/security_pulse/" +
"config/openstack_config.yaml")
# os_node_info_obj = \
# openstack_node_info_reader(base_dir +
# "/scenario/plugins/security_pulse/" +
# "config/openstack_config.yaml")
os_node_info_obj = openstack_node_info_reader(
cfg.CONF.security_pulse_test.testcase_setup_file)
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
# print input_params
ks_test = ks_admin_token_check()
result = ks_test.perform_ks_admin_token_check_test(input_params)
if not result:
return (404, "No result from test execution", [])
# print result
test_status = None
data = ""
for r in result:
if test_status is None or r[2].startswith("Fail"):
test_status = "fail"
elif test_status is None:
test_status = "success"
data = data + r[0] + " -> " + r[1] + " -> " + r[2] + "\n"
if test_status == "fail":
return (404, data, [])
else:
return (200, data, [])
return result
if __name__ == '__main__':
sct = security_common_test()

View File

@ -0,0 +1,87 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
# import cloudpulse
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario import base
from cloudpulse.scenario.plugins.security_pulse.testcase.file_check_test\
import SecurityFileCheck
from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
from cloudpulse.scenario.plugins.security_pulse.util import \
security_pulse_test_util
import json
import os
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file'),
cfg.StrOpt('testcase_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='setup file for security pulse test case'),
]
CONF = cfg.CONF
security_pulse_test_group = cfg.OptGroup(name='security_pulse_test',
title='Security pulse test' +
' param input file')
CONF.register_group(security_pulse_test_group)
CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
class security_filecheck_test(base.Scenario):
def security_file_check(self, *args, **kwargs):
testcase_input_file = ""
try:
testcase_input_file =\
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, json.dumps([{'Message': e.message}]), [])
if not os.path.isfile(testcase_input_file):
print ("Security file checking Testcase input file not found")
msg = {'Message': "Security file checking Testcase input file \
not found"}
return (404, json.dumps([msg]), [])
# base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("filepermission", input_data)
# os_node_info_obj = \
# openstack_node_info_reader(base_dir +
# "/scenario/plugins/security_pulse/" +
# "config/openstack_config.yaml")
os_node_info_obj = openstack_node_info_reader(
cfg.CONF.security_pulse_test.testcase_setup_file)
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
sec_file_check = SecurityFileCheck()
result = \
sec_file_check.perform_file_permission_check(input_params)
print ("result from security_file_check")
print (result)
return result
if __name__ == '__main__':
sfc = security_filecheck_test()
sfc.security_file_check()

View File

@ -0,0 +1,87 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
# import cloudpulse
# from cloudpulse.operator.ansible.openstack_node import openstack_node_obj
from cloudpulse.operator.ansible.openstack_node_info_reader import \
openstack_node_info_reader
from cloudpulse.scenario import base
from cloudpulse.scenario.plugins.security_pulse.testcase.mysql_tls_enable_test\
import mysql_tls_enablement_test
from cloudpulse.scenario.plugins.security_pulse.util.\
security_pulse_test_input import security_test_input_reader
from cloudpulse.scenario.plugins.security_pulse.util import \
security_pulse_test_util
import json
import os
from oslo_config import cfg
TESTS_OPTS = [
cfg.StrOpt('testcase_input_file',
default='',
help='Security testcase input file'),
cfg.StrOpt('testcase_setup_file',
default='/etc/cloudpulse/openstack_config.yaml',
help='setup file for security pulse test case'),
]
CONF = cfg.CONF
security_pulse_test_group = cfg.OptGroup(name='security_pulse_test',
title='Security pulse test' +
' param input file')
CONF.register_group(security_pulse_test_group)
CONF.register_opts(TESTS_OPTS, security_pulse_test_group)
class security_mysql_test(base.Scenario):
def security_mysql_tsl_enable_check(self, *args, **kwargs):
testcase_input_file = ""
try:
testcase_input_file =\
cfg.CONF.security_pulse_test.testcase_input_file
except Exception as e:
print ("Exception while reading the testcase input file")
return (404, json.dumps([{'Message': e.message}]), [])
if not os.path.isfile(testcase_input_file):
print ("Security mysql Testcase input file not found")
msg = {'Message': "Security mysql Testcase input file not found"}
return (404, json.dumps([msg]), [])
# base_dir = os.path.dirname(cloudpulse.__file__)
input_reader = security_test_input_reader(testcase_input_file)
input_data = input_reader.process_security_input_file()
input_params = security_pulse_test_util.\
get_test_input_by_name("mysql_tls_enablement_test", input_data)
# os_node_info_obj = \
# openstack_node_info_reader(base_dir +
# "/scenario/plugins/security_pulse/" +
# "config/openstack_config.yaml")
os_node_info_obj = openstack_node_info_reader(
cfg.CONF.security_pulse_test.testcase_setup_file)
openstack_node_list = os_node_info_obj.get_host_list()
input_params['os_host_list'] = openstack_node_list
mysql_common_test = mysql_tls_enablement_test()
result = \
mysql_common_test.perform_mysql_tls_enablement_test(input_params)
print ("result from security_mysql_tsl_enable_check")
print (result)
return result
if __name__ == '__main__':
sct = security_mysql_test()
sct.security_mysql_tsl_enable_check()

View File

@ -21,19 +21,39 @@ import stat
class tls_enable_check(object):
def __init__(self):
pass
def read_tls_config(self, config):
Result = {}
final_result = {}
overall_status = True
try:
config.get("ldap", "use_tls")
except ConfigParser.NoOptionError:
print ("Fail - use_tls option is not enabled")
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': 'use_tls option is not enabled',
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
use_tls = config.get("ldap", "use_tls")
if use_tls == 'false':
print ("Fail - use_tls option is enabled with 'false' value")
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "use_tls option is enabled with 'false' value",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
elif use_tls == 'true':
ca_dir = None
@ -44,12 +64,28 @@ class tls_enable_check(object):
tls_ca_file = config.get("ldap", "tls_cacertfile")
ca_dir = tls_ca_file[:tls_ca_file.rindex('/')]
except ConfigParser.NoOptionError:
print ("Fail - Both 'tls_ca_dir' and " +
"'tls_ca_file' are not defined")
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "Both 'tls_ca_dir' and" +
" 'tls_ca_file' are not defined",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
if not ca_dir:
print ("Fail - Both 'tls_ca_dir' and " +
"'tls_ca_file' are not defined")
overall_status = False
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "Both 'tls_ca_dir' and" +
" 'tls_ca_file' are not defined",
'Status': 'Fail'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
else:
for dirName, subdirList, fileList in os.walk(ca_dir):
@ -58,14 +94,30 @@ class tls_enable_check(object):
st = os.stat(f1)
user = pwd.getpwuid(st[stat.ST_UID])[0]
group = pwd.getpwuid(st[stat.ST_GID])[0]
# mode = oct(stat.S_IMODE(st[stat.ST_MODE]))
if user != 'keystone' or group != 'keystone':
print ("Fail - Certificate file directory " +
"user/group permission are user=%s, " +
"group=%s ", user, group)
msg = "Certificate file directory " + \
" user/group permission are user=" + user \
+ ", group=" + group
overall_status = False
final_result.update(
{'OverallStatus': overall_status})
res = {
'Test Case Name': 'TLS',
'Message': msg,
'Status': 'Fail'}
Result.update(res)
final_result.update({'result': [Result]})
print (final_result)
return
print ("Success - TLS is enabled and the Certificate file " +
"permissions are 'keystone'")
final_result.update({'OverallStatus': overall_status})
msg = {
'Test Case Name': 'TLS',
'Message': "TLS is enabled and the Certificate file" +
" permissions are 'keystone'",
'Status': 'Pass'}
Result.update(msg)
final_result.update({'result': [Result]})
print (final_result)
return
if __name__ == '__main__':

View File

@ -65,7 +65,6 @@ class SecurityFileCheck(object):
ans_runner = ansible_runner([obj])
if obj.getRole() == p:
os_dir = input_params[p + '_dir']
# self.createDirList(os_dir, file_info_dir)
all_baseline = eval(open(baseline_file).read())
baseline = all_baseline[p]
open(

View File

@ -0,0 +1,67 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cloudpulse
from cloudpulse.operator.ansible.ansible_runner import ansible_runner
from cloudpulse.operator.ansible import openstack_config_reader as os_cfg
import json
import os
import sys
class BaseLine(object):
def base_line(self, os_baseline_cfg):
try:
oscfg_reader = os_cfg.os_cfg_reader(os_baseline_cfg)
oscfg_reader.setOpenstackNodeIp()
oscfg_reader.printHostList()
openstack_host_list = oscfg_reader.get_host_list()
baseline_data = {}
for host in openstack_host_list:
f = open('/tmp/sec_hc/dir_list', 'w+')
for dir_name in host.getDirList():
f.write(dir_name + '\n')
f.close()
ans_runner = ansible_runner([host])
# execute_cmd
base_dir = os.path.dirname(cloudpulse.__file__)
base_dir += '/scenario/plugins/security_pulse/testcase'
flist = [base_dir + '/remote_baseline.py',
base_dir + '/remote_filecredentials.py',
'/tmp/sec_hc/dir_list'
]
results = ans_runner.execute_cmd(
"python " +
'/tmp/sec_hc/' +
"remote_baseline.py ",
file_list=flist)
# for node in results['contacted'].keys():
role = host.getRole()
node = host.getIp()
data = results['contacted'][node]['stdout']
baseline_data.update({role: eval(data)})
print (baseline_data)
formated_data = json.dumps(baseline_data, indent=4)
open('/tmp/sec_hc/os_allnode_baseline',
'w+').write(str(formated_data))
except Exception as e:
print (e)
if __name__ == '__main__':
os_cfg_file = sys.argv[1]
sec = BaseLine()
sec.base_line(os_cfg_file)

View File

@ -18,53 +18,99 @@ import os
class keystone_admin_token_check(object):
def __init__(self):
pass
def keystone_admin_token_test(self):
ks_conf_file = "/etc/keystone/keystone.conf"
result = []
output = []
Result = {}
final_result = {}
overall_status = True
config = ConfigParser.ConfigParser()
if os.path.exists(ks_conf_file):
try:
config.read(ks_conf_file)
except Exception:
result.append("admin_token - keystone.conf not found - Fail")
msg = {
'Test Case Name': 'Admin Token',
'Message': 'keystone.conf not found',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
try:
config.get("DEFAULT", "admin_token")
except ConfigParser.NoOptionError:
result.append("admin_token - Not defined - Pass")
msg = {
'Test Case Name': 'Admin Token',
'Message': 'Admin Token is not defined',
'Status': 'Pass'}
Result.update(msg)
else:
result.append("admin_token - Defined - Fail")
msg = {
'Test Case Name': 'Admin Token',
'Message': 'Admin Token is defined',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
result.append("admin_token - keystone.conf not found - Fail")
msg = {
'Test Case Name': 'Admin Token',
'Message': 'keystone.conf not found',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
output.append(Result)
Result = {}
ks_paste_conf_file = "/etc/keystone/keystone-paste.ini"
if os.path.exists(ks_paste_conf_file):
try:
config.read(ks_paste_conf_file)
except Exception:
result.append("admin_auth_token - keystone-paste.ini not " +
"found - Pass")
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'keystone-paste.ini not found',
'Status': 'Pass'}
Result.update(msg)
else:
try:
config.get("filter:admin_token_auth",
"paste.filter_factory")
except (ConfigParser.NoOptionError,
ConfigParser.NoSectionError):
result.append("admin_auth_token - Not defined - Pass")
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token not defined',
'Status': 'Pass'}
Result.update(msg)
else:
option = config.get("filter:admin_token_auth",
"paste.filter_factory")
if "AdminTokenAuthMiddleware" in option:
result.append("admin_auth_token - Defined - Fail")
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token defined',
'Status': 'Fail'}
Result.update(msg)
overall_status = False
else:
result.append("admin_auth_token - Not Defined - Pass")
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'admin_auth_token not defined',
'Status': 'Pass'}
Result.update(msg)
else:
result.append("admin_auth_token - keystone-paste.ini not found " +
"- Pass")
print (result)
msg = {
'Test Case Name': 'Admin Token AuthMiddleware',
'Message': 'keystone-paste.ini not found',
'Status': 'Pass'}
Result.update(msg)
output.append(Result)
final_result.update({'OverallStatus': overall_status})
final_result.update({'result': output})
print (final_result)
if __name__ == '__main__':
keystone_admin_token_check_obj = keystone_admin_token_check()

View File

@ -0,0 +1,63 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pwd
import remote_filecredentials as filecredentials
import stat
class FileTraversal(object):
def file_traversal(self, dir_list, file_dir):
try:
output = {}
for dir_name in dir_list:
self.rootDir = dir_name
for dirName, subdirList, fileList in os.walk(self.rootDir):
os.chdir(dirName)
for f1 in fileList:
st = os.stat(f1)
ins = filecredentials.AccessPreveliges(
f1, st[stat.ST_SIZE], oct(
stat.S_IMODE(
st[
stat.ST_MODE])), pwd.getpwuid(
st[stat.ST_UID]), pwd.getpwuid(
st[stat.ST_GID]))
output.update(
{
ins.getName(): {
'size': ins.getSize(),
'mode': ins.getMode(),
'user': ins.getUser(),
'group': ins.getGroup()}})
print (output)
except Exception as e:
print (e)
if __name__ == '__main__':
# LOG.info('Executing test')
file_dir = '/tmp/sec_hc/'
dirs = []
with open(file_dir + 'dir_list') as f:
dirs = f.read().splitlines()
sec = FileTraversal()
# LOG.info('Executing test1')
sec.file_traversal(dirs, file_dir)

View File

@ -0,0 +1,127 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pwd
import remote_filecredentials as filecredentials
import stat
import string
class FileCheck(object):
def file_check(self, dir_list, file_dir):
try:
output = {}
result = []
final_result = {}
overall_status = True
for dir_name in dir_list:
self.rootDir = dir_name
for dirName, subdirList, fileList in os.walk(self.rootDir):
# flist = []
# for f in fileList:
# flist.append(os.path.abspath(os.path.join(dirName,f)))
os.chdir(dirName)
for f1 in fileList: # flist
st = os.stat(f1)
ins = filecredentials.AccessPreveliges(
f1, st[stat.ST_SIZE], oct(
stat.S_IMODE(
st[
stat.ST_MODE])), pwd.getpwuid(
st[stat.ST_UID]), pwd.getpwuid(
st[stat.ST_GID]))
output.update(
{
ins.getName(): {
'size': ins.getSize(),
'mode': ins.getMode(),
'user': ins.getUser(),
'group': ins.getGroup()}})
keystone_baseline = eval(open(file_dir + 'os_baseline').read())
remote_mismatch = list(set(output.keys()).
difference(keystone_baseline.keys()))
baseline_mismatch = list(set(keystone_baseline.keys()).
difference(output.keys()))
for key in output.keys():
if key in keystone_baseline:
new = output.get(key)
base = keystone_baseline[key]
diffkeys = [k for k in base if base[k] != new[k]]
l = []
for k in diffkeys:
l.append(
'"' +
k +
'"' +
' is modified from ' +
base[k] +
' to ' +
new[k] +
' in remote')
msg = string.join(l, ', ')
if msg:
temp = {'Test Case Name': key, 'Status': 'Fail'}
temp.update({'Message': msg})
result.append(temp)
if baseline_mismatch:
for item in baseline_mismatch:
msg = 'File not found in remote'
temp = {'Test Case Name': item, 'Status': 'Fail'}
temp.update({'Message': msg})
result.append(temp)
if remote_mismatch:
for item in remote_mismatch:
msg = 'New file found in remote'
temp = {'Test Case Name': item, 'Status': 'Fail'}
temp.update({'Message': msg})
result.append(temp)
if not result:
overall_status = True
final_result.update(
{'OverallStatus': overall_status})
result = {}
result.update({'Test Case Name': 'File permission Check'})
result.update({'Status': 'Pass'})
result.update({'Message': 'No mismatch'})
final_result.update({'result': [result]})
print (final_result)
return
else:
final_result.update(
{'OverallStatus': False})
final_result.update({'result': result})
print (final_result)
return
except Exception as e:
final_result.update(
{'OverallStatus': False})
result = {}
result.update({'Test Case Name': 'File permission Check'})
result.update({'Status': 'Fail'})
result.update(
{'Message': 'Exception in file comparision' + str(e)})
final_result.update({'result': [result]})
print (final_result)
return
if __name__ == '__main__':
file_dir = '/tmp/sec_hc/'
dirs = []
with open(file_dir + 'dir_list') as f:
dirs = f.read().splitlines()
sec = FileCheck()
sec.file_check(dirs, file_dir)

View File

@ -0,0 +1,39 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class AccessPreveliges(object):
def __init__(self, name=None, size=None, mode=None, user=None, group=None):
self.name = name
self.size = str(size)
self.mode = mode
self.user = user
self.group = group
def getName(self):
return self.name
def getSize(self):
return self.size
def getMode(self):
return self.mode
def getUser(self):
return self.user[0]
def getGroup(self):
return self.group[0]

View File

@ -0,0 +1,152 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import ConfigParser
import os
from pwd import getpwuid
import string
class mysql_tls_enable_check(object):
def __init__(self):
self.ssl_status = False
self.SSL_file = '/etc/my.cnf.d/server.cnf'
self.SSLOwner = 'mysql'
self.config = ConfigParser.ConfigParser(allow_no_value=True)
def getSSLStatus(self):
ssl_files = []
Result = {}
final_result = {}
overall_status = True
if os.path.exists(self.SSL_file):
try:
self.config.read(self.SSL_file)
self.ssl_status = self.config.get('mysqld', 'ssl')
if self.ssl_status in ['true', 'True']:
ssl_files.append(self.config.get('mysqld', 'ssl-ca'))
ssl_files.append(self.config.get('mysqld', 'ssl-cert'))
ssl_files.append(self.config.get('mysqld', 'ssl-key'))
file_objs = self.getFileInfo(ssl_files)
if file_objs:
op = self.checkFilePermission(file_objs)
if op:
overall_status = False
final_result.update(
{'OverallStatus': overall_status})
Result.update(
{
'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in mysql with \
following mismatch - ' +
string.join(
op,
', '),
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = True
final_result.update(
{'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in \
mysql',
'Status': 'Pass'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update(
{
'Test Case Name': 'mysql TSL',
'Message': 'SSL is enabled in mysql and not \
able to check the file permission of \
SSL files',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'SSL is not enabled in mysql',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
except Exception:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': 'Exception while \
reading ' + self.SSL_file,
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
else:
overall_status = False
final_result.update({'OverallStatus': overall_status})
Result.update({'Test Case Name': 'mysql TSL',
'Message': self.SSL_file + ' not found',
'Status': 'Fail'})
final_result.update({'result': [Result]})
print (final_result)
return
def formfileObj(self, file_name, stat_file_obj):
file_info = {}
try:
file_info['owner'] = getpwuid(stat_file_obj.st_uid).pw_name
file_info['group_owner'] = getpwuid(stat_file_obj.st_gid).pw_name
except Exception:
pass
return file_info
def getFileInfo(self, files=[]):
file_objs = {}
try:
for f in files:
obj = os.stat(f)
opt = self.formfileObj(f, obj)
file_objs.update({f: opt})
except Exception:
pass
return file_objs
def checkFilePermission(self, file_objs={}):
result = []
for file, obj in file_objs.items():
if obj.get('owner') != self.SSLOwner:
msg = 'File "%s" owner permission is not matching' % (file)
result.append(msg)
if obj.get('group_owner') != self.SSLOwner:
msg = 'File "%s" group owner permision is not matching' % (
file)
result.append(msg)
return result
if __name__ == '__main__':
checkssl = mysql_tls_enable_check()
checkssl.getSSLStatus()