Converting to oslo.config for configuration

- Merges arguments.py and and existing config.py files into one file
- Removes dependencies on cafe.* config file / CLI parsing
- Adds "register_opts" to BTC to allow Tests to specify config options
- Moves us completely from cclogging to Python logging

Change-Id: I0d4a84563d54307c94c0064be429919f9d91d67b
This commit is contained in:
Charles Neill 2016-07-05 15:17:43 -05:00
parent 70c62a0c0b
commit a8a9ac6e37
18 changed files with 309 additions and 262 deletions

View File

@ -1,3 +1,4 @@
opencafe>=0.2.4,<0.2.5
six>=1.9.0
requests>=2.9.0
oslo.config>=3.10.0 # Apache-2.0

View File

@ -20,6 +20,9 @@ classifier =
console_scripts =
syntribos = syntribos.runner:entry_point
oslo.config.opts =
syntribos.config = syntribos.config:list_opts
[build_sphinx]
all_files = 1
build-dir = doc/build

View File

@ -1,109 +0,0 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import sys
import cafe.drivers.unittest.arguments
class InputType(object):
"""Reads a file/directory, or stdin, to collect request templates."""
def __init__(self, mode, bufsize):
self._mode = mode
self._bufsize = bufsize
def __call__(self, string):
"""Yield the name and contents of the 'input' file(s)
:param str string: the value supplied as the 'input' argument
:rtype: tuple
:returns: (file name, file contents)
"""
if string == '-':
fp = sys.stdin
yield fp.name, fp.read()
elif os.path.isdir(string):
for path, _, files in os.walk(string):
for file_ in files:
file_path = os.path.join(path, file_)
fp = open(file_path, self._mode, self._bufsize)
yield file_, fp.read()
fp.close()
elif os.path.isfile(string):
try:
fp = open(string, self._mode, self._bufsize)
yield os.path.split(fp.name)[1], fp.read()
fp.close()
except Exception as e:
message = "can't open {}:{}"
raise Exception(message.format(string, e))
else:
message = "can't open {} not a readable file or dir"
raise Exception(message.format(string))
class SyntribosCLI(argparse.ArgumentParser):
"""Class for parsing Syntribos command-line arguments."""
def __init__(self, *args, **kwargs):
super(SyntribosCLI, self).__init__(*args, **kwargs)
self._add_args()
def _add_args(self):
self.add_argument(
"config", metavar="<config>",
action=cafe.drivers.unittest.arguments.ConfigAction,
help="test config. Looks in the ~/.opencafe/configs directory"
"Example: compute/dev.environ")
self.add_argument(
"input", metavar="<input_file>", type=InputType('r', 0),
help="<input file|directory of files|-(for stdin)>")
self.add_argument(
"-t", "--test-types", metavar="TEST_TYPES", nargs="*",
default=[""], help="Test types to run against api")
self.add_argument(
"-v", "--verbose",
action="store_true",
help="unittest verbose pass through")
self.add_argument(
"--dry-run",
action="store_true",
help="Dry Run gets all test cases but does not run them")
self.add_argument(
'-o', '--output', dest='output_file', action='store',
default=None, help='write report to filename')
self.add_argument(
'-f', '--format', dest='output_format', action='store',
default='json', help='specify output format',
choices=["json"])
self.add_argument(
'-S', '--min_severity', dest='min_severity', action='store',
default='LOW', help='specify minimum severity for reporting',
type=str.upper, choices=['LOW', 'MEDIUM', 'HIGH'])
self.add_argument(
'-C', '--min_confidence', dest='min_confidence', action='store',
default='LOW', help='specify minimum confidence for reporting',
type=str.upper, choices=['LOW', 'MEDIUM', 'HIGH'])

View File

@ -11,16 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos.signal
import syntribos.tests.fuzz.config
if not os.environ.get("CAFE_CONFIG_FILE_PATH"):
os.environ["CAFE_CONFIG_FILE_PATH"] = "./"
config = syntribos.tests.fuzz.config.BaseFuzzConfig()
CONF = cfg.CONF
def percentage_difference(resp1, resp2):
@ -54,7 +49,7 @@ def percentage_difference(resp1, resp2):
elif data["req_diff"] == data["resp_diff"]:
# Response difference accounted for by difference in request lengths
return None
elif data["percent_diff"] < config.percent:
elif data["percent_diff"] < CONF.test.length_diff_percent:
# Difference not larger than configured percentage
return None
@ -71,7 +66,7 @@ def percentage_difference(resp1, resp2):
"\tConfig percent: {8}\n").format(
data["req1_len"], data["resp1_len"], data["req2_len"],
data["resp2_len"], data["req_diff"], data["resp_diff"],
data["percent_diff"], data["dir"], config.percent)
data["percent_diff"], data["dir"], CONF.test.length_diff_percent)
slug = "LENGTH_DIFF_{dir}".format(dir=data["dir"])

View File

@ -11,17 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos.signal
import syntribos.tests.fuzz.config
if not os.environ.get("CAFE_CONFIG_FILE_PATH"):
os.environ["CAFE_CONFIG_FILE_PATH"] = "./"
config = syntribos.tests.fuzz.config.BaseFuzzConfig()
CONF = cfg.CONF
def percentage_difference(resp1, resp2):
@ -49,7 +43,7 @@ def percentage_difference(resp1, resp2):
if data["resp1_time"] < data["resp2_time"]:
data["dir"] = "OVER"
if data["percent_diff"] < config.time_difference_percent:
if data["percent_diff"] < CONF.test.time_diff_percent:
# Difference not larger than configured percentage
return None
@ -62,7 +56,7 @@ def percentage_difference(resp1, resp2):
"\tDifference direction: {4}"
"\tConfig percent: {5}\n").format(
data["resp1_time"], data["resp2_time"], data["time_diff"],
data["percent_diff"], data["dir"], config.percent)
data["percent_diff"], data["dir"], CONF.test.time_diff_percent)
slug = "TIME_DIFF_{dir}".format(dir=data["dir"])
@ -75,14 +69,14 @@ def absolute_time(response):
:returns: SynSignal or None
"""
if response.elapsed.total_seconds() < config.absolute_time:
if response.elapsed.total_seconds() < CONF.test.max_time:
return None
data = {
"request": response.request,
"response": response,
"elapsed": response.elapsed.total_seconds(),
"max_time": config.absolute_time
"max_time": CONF.test.max_time
}
text = (

View File

@ -82,9 +82,9 @@ def _log_transaction(log, level=logging.DEBUG):
response = func(*args, **kwargs)
except requests.exceptions.RequestException as exc:
signals.register(http_checks.check_fail(exc))
log.critical('Call to requests FAILED')
log.log(level, "A call to request() failed.")
log.exception(exc)
# raise exc
log.log(level, "=" * 80)
except Exception as exc:
log.critical('Call to Requests failed due to exception')
log.exception(exc)

View File

@ -1,4 +1,4 @@
# Copyright 2015 Rackspace
# Copyright 2015-2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -11,21 +11,196 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cafe.engine.models.data_interfaces as data_interfaces
import logging
import os
import sys
from oslo_config import cfg
import syntribos
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class MainConfig(data_interfaces.ConfigSectionInterface):
def handle_config_exception(exc):
msg = ""
"""Reads in configuration data from config file."""
if isinstance(exc, cfg.RequiredOptError):
msg = "Missing option '{opt}'".format(opt=exc.opt_name)
if exc.group:
msg += " in group '{}'".format(exc.group)
CONF.print_help()
SECTION_NAME = "syntribos"
elif isinstance(exc, cfg.ConfigFilesNotFoundError):
msg = ("Configuration file specified ('{config}') wasn't "
"found or was unreadable.").format(
config=",".join(CONF.config_file))
@property
def endpoint(self):
"""The target host to be tested."""
return self.get("endpoint")
if msg:
LOG.warning(msg)
print("=" * 80)
sys.exit(0)
else:
raise exc
@property
def version(self):
"""Used for base_auth test."""
return self.get("version")
class ExistingPathType(object):
def _raise_invalid_file(self, filename, exc=None):
msg = ("\nCan't open '{filename}'; not a readable file or dir."
"\nPlease enter a valid file or dir location.{exception}"
).format(filename=filename,
exception="\nEXCEPTION: {exc}\n".format(exc=exc))
raise IOError(msg)
def __call__(self, string):
if not os.path.isdir(string) and not os.path.isfile(string):
self._raise_invalid_file(string)
return string
class ExistingDirType(ExistingPathType):
def __call__(self, string):
if not os.path.isdir(string):
self._raise_invalid_file(string)
return string
class ExistingFileType(ExistingPathType):
def __call__(self, string):
if not os.path.isfile(string):
self._raise_invalid_file(string)
return string
class TemplateType(ExistingPathType):
"""Reads a file/directory to collect request templates."""
def __init__(self, mode, bufsize):
self._mode = mode
self._bufsize = bufsize
def _fetch_from_dir(self, string):
for path, _, files in os.walk(string):
for file_ in files:
file_path = os.path.join(path, file_)
yield self._fetch_from_file(file_path)
def _fetch_from_file(self, string):
try:
with open(string, self._mode, self._bufsize) as fp:
return os.path.split(fp.name)[1], fp.read()
except IOError as exc:
self._raise_invalid_file(string, exc=exc)
def __call__(self, string):
"""Yield the name and contents of the file(s)
:param str string: the value supplied as the argument
:rtype: tuple
:returns: (file name, file contents)
"""
super(TemplateType, self).__call__(string)
if os.path.isdir(string):
return self._fetch_from_dir(string)
elif os.path.isfile(string):
return [self._fetch_from_file(string)]
syntribos_group = cfg.OptGroup(name="syntribos", title="Main Syntribos Config")
user_group = cfg.OptGroup(name="user", title="Identity Config")
test_group = cfg.OptGroup(name="test", title="Test Config")
def list_opts():
results = []
results.append((None, list_cli_opts()))
results.append((None, list_syntribos_opts()))
results.append((user_group, list_user_opts()))
results.append((test_group, list_test_opts()))
return results
def register_opts():
# CLI options
CONF.register_cli_opts(list_cli_opts())
# Syntribos options
CONF.register_group(syntribos_group)
CONF.register_opts(list_syntribos_opts(), group=syntribos_group)
# Keystone options
CONF.register_group(user_group)
CONF.register_opts(list_user_opts(), group=user_group)
# Test options
CONF.register_group(test_group)
CONF.register_opts(list_test_opts(), group=test_group)
def list_cli_opts():
return [
cfg.MultiStrOpt("test-types", dest="test_types", short="t",
default=[""],
help="Test types to run against the target API"),
cfg.BoolOpt("verbose", short="v", default=False,
help="Print more information to output"),
cfg.BoolOpt("dry-run", dest="dry_run", short="D", default=False,
help="Don't run tests, just print them out to console"),
cfg.StrOpt("outfile", short="o", default=None,
help="File to print output to"),
cfg.StrOpt("format", dest="output_format", short="f", default="json",
choices=["json"], ignore_case=True,
help="The format for outputting results"),
cfg.StrOpt("min-severity", dest="min_severity", short="S",
default="LOW", choices=syntribos.RANKING,
help="Select a minimum severity for reported defects"),
cfg.StrOpt("min-confidence", dest="min_confidence", short="C",
default="LOW", choices=syntribos.RANKING,
help="Select a minimum confidence for reported defects")
]
def list_syntribos_opts():
return [
cfg.StrOpt("endpoint", default="",
sample_default="http://localhost/app", required=True,
help="The target host to be tested"),
cfg.Opt("templates", type=TemplateType('r', 0), required=True,
help="A directory of template files, or a single template "
"file, to test on the target API"),
cfg.StrOpt("payload_dir", default="", required=True,
help="The location where we can find Syntribos' payloads"),
cfg.StrOpt("log_dir", default="", required=True,
help="Where to save debug log files for a Syntribos run")
]
def list_user_opts():
return [
cfg.StrOpt("username", default="", help="Keystone username"),
cfg.StrOpt("password", default="", help="Keystone user password",
secret=True),
cfg.StrOpt("project", default="", help="Keystone project ID"),
cfg.StrOpt("token", default="", help="Keystone auth token",
secret=True),
cfg.StrOpt("endpoint", default="", help="Keystone endpoint URI")
]
def list_test_opts():
# TODO(cneill): Discover other config options from tests dynamically
return [
cfg.FloatOpt("length_diff_percent", default=200.0,
help="Percentage difference between initial request "
"and test request body length to trigger a signal"),
cfg.FloatOpt("time_diff_percent", default=200.0,
help="Perecentage difference between initial response "
"time and test response time to trigger a signal"),
cfg.IntOpt("max_time", default=10,
help="Maximum absolute time (in seconds) to wait for a "
"response before triggering a timeout signal")
]

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SEP = "=" * 80
RANKING = ['UNDEFINED', 'LOW', 'MEDIUM', 'HIGH']
RANKING_VALUES = {'UNDEFINED': 0, 'LOW': 1, 'MEDIUM': 2, 'HIGH': 3}
for rank in RANKING_VALUES:

View File

@ -11,27 +11,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import os
import pkgutil
import sys
import time
import unittest
from cafe.common.reporting.cclogging import init_root_log_handler
from cafe.configurator.managers import TestEnvManager
import cafe.drivers.base
from oslo_config import cfg
import syntribos.arguments
import syntribos.config
from syntribos.result import IssueTestResult
import syntribos.tests as tests
import syntribos.tests.base
result = None
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class Runner(object):
log_file = ""
@classmethod
def print_tests(cls):
"""Print out all the tests that will be run."""
@ -44,8 +47,6 @@ class Runner(object):
:param package: a package of tests for pkgutil to load
"""
if not os.environ.get("CAFE_CONFIG_FILE_PATH"):
os.environ["CAFE_CONFIG_FILE_PATH"] = "./"
for importer, modname, ispkg in pkgutil.walk_packages(
path=package.__path__,
prefix=package.__name__ + '.',
@ -70,7 +71,6 @@ class Runner(object):
@staticmethod
def print_symbol():
"""Syntribos radiation symbol."""
border = '-' * 40
symbol = """ Syntribos
xxxxxxx
x xxxxxxxxxxxxx x
@ -90,62 +90,69 @@ class Runner(object):
x
=== Automated API Scanning ==="""
print(border)
print(syntribos.SEP)
print(symbol)
print(border)
print(syntribos.SEP)
@staticmethod
def print_log():
@classmethod
def print_log(cls):
"""Print the path to the log folder for this run."""
test_log = os.environ.get("CAFE_TEST_LOG_PATH")
test_log = cls.get_log_file_name()
if test_log:
print("=" * 70)
print("LOG PATH..........: {0}".format(test_log))
print("=" * 70)
print(syntribos.SEP)
print("LOG PATH..........: {path}".format(path=test_log))
print(syntribos.SEP)
@classmethod
def get_default_conf_files(cls):
return ["~/.syntribos/syntribos.conf"]
@classmethod
def get_log_file_name(cls):
if not cls.log_file:
log_dir = CONF.syntribos.log_dir
time_str = datetime.datetime.now().strftime("%Y-%m-%d_%X.%f")
file_name = "{time}.log".format(time=time_str)
cls.log_file = os.path.join(log_dir, file_name)
return cls.log_file
@classmethod
def run(cls):
global result
try:
try:
syntribos.config.register_opts()
CONF(sys.argv[1:],
default_config_files=cls.get_default_conf_files())
logging.basicConfig(filename=cls.get_log_file_name(),
level=logging.DEBUG)
except Exception as exc:
syntribos.config.handle_config_exception(exc)
cls.print_symbol()
usage = """
syntribos <config> <input_file> --test-types=TEST_TYPES
syntribos <config> <input_file> -t TEST_TYPE TEST_TYPE ...
syntribos <config> <input_file>
"""
args, unknown = syntribos.arguments.SyntribosCLI(
usage=usage).parse_known_args()
test_env_manager = TestEnvManager(
"", args.config, test_repo_package_name="os")
test_env_manager.finalize()
cls.set_env()
init_root_log_handler()
cls.print_log()
if not args.output_file:
result = IssueTestResult(
unittest.runner._WritelnDecorator(sys.stdout),
True, 2 if args.verbose else 1)
# 2 == higher verbosity, 1 == normal
verbosity = 2 if CONF.verbose else 1
if not CONF.outfile:
decorator = unittest.runner._WritelnDecorator(sys.stdout)
else:
result = IssueTestResult(
unittest.runner._WritelnDecorator(
open(args.output_file, 'w')),
True, 2 if args.verbose else 1)
decorator = unittest.runner._WritelnDecorator(
open(CONF.outfile, 'w'))
result = IssueTestResult(decorator, True, verbosity)
start_time = time.time()
for file_path, req_str in args.input:
for test_name, test_class in cls.get_tests(args.test_types):
for file_path, req_str in CONF.syntribos.templates:
for test_name, test_class in cls.get_tests(CONF.test_types):
test_class.send_init_request(file_path, req_str)
for test in test_class.get_test_cases(file_path, req_str):
if test:
cls.run_test(test, result, args.dry_run)
cls.print_result(result, start_time, args)
cls.run_test(test, result, CONF.dry_run)
cls.print_result(result, start_time)
except KeyboardInterrupt:
cls.print_result(result, start_time, args)
cafe.drivers.base.print_exception(
"Runner",
"run",
"Keyboard Interrupt, exiting...")
cls.print_result(result, start_time)
print("Keyboard interrupt, exiting...")
exit(0)
@classmethod
@ -167,31 +174,23 @@ class Runner(object):
suite.run(result)
@classmethod
def set_env(cls):
"""Set environment variables for this run."""
config = syntribos.config.MainConfig()
os.environ["SYNTRIBOS_ENDPOINT"] = config.endpoint
@classmethod
def print_result(cls, result, start_time, args):
def print_result(cls, result, start_time):
"""Prints test summary/stats (e.g. # failures) to stdout
:param result: Global result object with all issues/etc.
:type result: :class:`syntribos.result.IssueTestResult`
:param float start_time: Time this run started
:param args: Parsed CLI arguments
:type args: ``argparse.Namespace``
"""
result.printErrors(args.output_format, args.min_severity,
args.min_confidence)
result.printErrors(
CONF.output_format, CONF.min_severity, CONF.min_confidence)
run_time = time.time() - start_time
tests = result.testsRun
failures = len(result.failures)
errors = len(result.errors)
print("\n{0}".format("-" * 70))
print("Ran {0} test{1} in {2:.3f}s".format(
tests, "s" * bool(tests - 1), run_time))
print("\n{sep}\nRan {num} test{suff} in {time:.3f}s".format(
sep=syntribos.SEP, num=tests, suff="s" * bool(tests - 1),
time=run_time))
if failures or errors:
print("\nFAILED ({0}{1}{2})".format(
"failures={0}".format(failures) if failures else "",

View File

@ -11,14 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos
from syntribos.clients.http import client
import syntribos.extensions.identity.client
import syntribos.tests.auth.datagen
from syntribos.tests import base
data_dir = os.environ.get("CAFE_DATA_DIR_PATH")
CONF = cfg.CONF
class BaseAuthTestCase(base.BaseTestCase):
@ -61,14 +63,14 @@ class BaseAuthTestCase(base.BaseTestCase):
is created (in addition to the base case, that is)
"""
alt_user_config = syntribos.extensions.identity.config.UserConfig(
section_name='alt_user')
alt_user_id = alt_user_config.user_id
# TODO(cneill): FIX THIS!
alt_user_id = "1"
if alt_user_id is None:
return
request_obj = syntribos.tests.auth.datagen.AuthParser.create_request(
file_content, os.environ.get("SYNTRIBOS_ENDPOINT"))
file_content, CONF.syntribos.endpoint)
prepared_copy = request_obj.get_prepared_copy()
cls.init_response = cls.client.send_request(prepared_copy)
@ -76,8 +78,8 @@ class BaseAuthTestCase(base.BaseTestCase):
prefix_name = "{filename}_{test_name}_{fuzz_file}_".format(
filename=filename, test_name=cls.test_name, fuzz_file='auth')
main_config = syntribos.config.MainConfig()
version = main_config.version
# TODO(cneill): FIX THIS
version = "v2"
if version is None or version == 'v2':
alt_token = syntribos.extensions.identity.client.get_token_v2(

View File

@ -11,10 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from syntribos.clients.http.models import RequestHelperMixin
from syntribos.clients.http.models import RequestObject
from syntribos.clients.http import parser
from syntribos.extensions.identity.config import UserConfig
CONF = cfg.CONF
class AuthMixin(object):
@ -42,8 +45,7 @@ class AuthRequest(RequestObject, AuthMixin, RequestHelperMixin):
super(AuthRequest, self).prepare_request()
if auth_type != "url":
self.url = self.remove_braces(self.url)
user_config = UserConfig(section_name='user')
user_id = user_config.user_id
user_id = CONF.user.project
self.url = self.url.replace('USER_ID', user_id)

View File

@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import string as t_string
import unittest
import cafe.drivers.unittest.fixtures
from oslo_config import cfg
import six
from six.moves.urllib.parse import urlparse
@ -27,6 +27,7 @@ from syntribos.signal import SignalHolder
ALLOWED_CHARS = "().-_{0}{1}".format(t_string.ascii_letters, t_string.digits)
"""test_table is the master list of tests to be run by the runner"""
CONF = cfg.CONF
test_table = {}
@ -73,7 +74,7 @@ class TestType(type):
@six.add_metaclass(TestType)
class BaseTestCase(cafe.drivers.unittest.fixtures.BaseTestFixture):
class BaseTestCase(unittest.TestCase):
"""Base class for building new tests
@ -108,6 +109,10 @@ class BaseTestCase(cafe.drivers.unittest.fixtures.BaseTestFixture):
test_signals = SignalHolder()
diff_signals = SignalHolder()
@classmethod
def register_opts(cls):
pass
@classmethod
def get_test_cases(cls, filename, file_content):
"""Returns tests for given TestCase class (overwritten by children)."""
@ -116,7 +121,7 @@ class BaseTestCase(cafe.drivers.unittest.fixtures.BaseTestFixture):
@classmethod
def send_init_request(cls, filename, file_content, parser=parser):
request_obj = parser.create_request(
file_content, os.environ.get("SYNTRIBOS_ENDPOINT"))
file_content, CONF.syntribos.endpoint)
prepared_copy = request_obj.get_prepared_copy()
cls.init_resp, cls.init_signals = cls.client.send_request(
prepared_copy)

View File

@ -13,25 +13,26 @@
# limitations under the License.
import os
from oslo_config import cfg
from six.moves.urllib.parse import urlparse
import syntribos
from syntribos.checks import length_diff as length_diff
from syntribos.tests import base
import syntribos.tests.fuzz.config
import syntribos.tests.fuzz.datagen
data_dir = os.environ.get("CAFE_DATA_DIR_PATH", "")
CONF = cfg.CONF
payload_dir = CONF.syntribos.payload_dir
class BaseFuzzTestCase(base.BaseTestCase):
config = syntribos.tests.fuzz.config.BaseFuzzConfig()
failure_keys = None
success_keys = None
@classmethod
def _get_strings(cls, file_name=None):
path = os.path.join(data_dir, file_name or cls.data_key)
path = os.path.join(payload_dir, file_name or cls.data_key)
with open(path, "rb") as fp:
return fp.read().splitlines()
@ -120,7 +121,7 @@ class BaseFuzzTestCase(base.BaseTestCase):
"returned when sending an attack string "
"exceeds {0} percent, which could indicate a "
"vulnerability to injection attacks"
).format(self.config.percent)
).format(CONF.test.length_diff_percent)
self.register_issue(
defect_type="length_diff", severity=syntribos.LOW,
confidence=syntribos.LOW, description=description

View File

@ -50,7 +50,7 @@ class CommandInjectionBody(base_fuzz.BaseFuzzTestCase):
"the request and the arrival of the res"
"ponse exceeds the expected amount of time, "
"suggesting a vulnerability to command "
"injection attacks.").format(self.resp.elapsed))
"injection attacks."))
class CommandInjectionParams(CommandInjectionBody):

View File

@ -1,28 +0,0 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cafe.engine.models.data_interfaces import(
ConfigSectionInterface as ConfigSectionInterface
)
class BaseFuzzConfig(ConfigSectionInterface):
SECTION_NAME = "fuzz"
@property
def percent(self):
return float(self.get("percent", 200.0))
@property
def time_difference_percent(self):
return float(self.get("time_difference_percent", 1000.0))

View File

@ -11,19 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos
from syntribos.checks import time_diff as time_diff
from syntribos.tests.fuzz import base_fuzz
import syntribos.tests.fuzz.datagen
CONF = cfg.CONF
class XMLExternalEntityBody(base_fuzz.BaseFuzzTestCase):
test_name = "XML_EXTERNAL_ENTITY_BODY"
test_type = "data"
dtds_data_key = "xml-external.txt"
config = syntribos.tests.fuzz.config.BaseFuzzConfig()
failure_keys = [
'root:',
'root@',
@ -44,7 +45,7 @@ class XMLExternalEntityBody(base_fuzz.BaseFuzzTestCase):
"""
# Send request for different content-types
request_obj = syntribos.tests.fuzz.datagen.FuzzParser.create_request(
file_content, os.environ.get("SYNTRIBOS_ENDPOINT"))
file_content, CONF.syntribos.endpoint)
prepared_copy = request_obj.get_prepared_copy()
prepared_copy.headers['content-type'] = "application/json"

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos
from syntribos.clients.http import client
@ -19,6 +19,9 @@ from syntribos.clients.http import parser
from syntribos.tests import base
CONF = cfg.CONF
class CorsHeader(base.BaseTestCase):
"""Test to check if CORS header variables are set to wild characters."""
@ -32,7 +35,7 @@ class CorsHeader(base.BaseTestCase):
def get_test_cases(cls, filename, file_content):
request_obj = parser.create_request(
file_content, os.environ.get("SYNTRIBOS_ENDPOINT")
file_content, CONF.syntribos.endpoint
)
request_obj.headers['Origin'] = 'http://example.com'
cls.test_resp, cls.test_signals = cls.client.send_request(request_obj)

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import syntribos
from syntribos.checks import https_check
@ -20,6 +20,9 @@ from syntribos.clients.http import parser
from syntribos.tests import base
CONF = cfg.CONF
class SSLTestCase(base.BaseTestCase):
test_name = "SSL"
@ -31,7 +34,7 @@ class SSLTestCase(base.BaseTestCase):
def get_test_cases(cls, filename, file_content):
request_obj = parser.create_request(
file_content, os.environ.get("SYNTRIBOS_ENDPOINT")
file_content, CONF.syntribos.endpoint
)
cls.test_resp, cls.test_signals = cls.client.send_request(request_obj)
yield cls