Improve performance by multithreading test calls

This change:
1) rewrites the runner to spawn a thread pool for each template
   and assigns a worker for each test case
2) makes the output colorized by default
3) makes minor changes to the output

Change-Id: I49906f5daaa339ca9429913680203c762a0ad9fe
This commit is contained in:
Michael Dong 2017-08-16 20:12:39 -05:00
parent cb458c03db
commit c4586a374b
13 changed files with 129 additions and 111 deletions

View File

@ -80,4 +80,5 @@ class HTTPClient(object):
'data': data}, **requestslib_kwargs) 'data': data}, **requestslib_kwargs)
# Make the request # Make the request
return requests.request(method, url, **requestslib_kwargs) return requests.request(method, url, allow_redirects=False,
**requestslib_kwargs)

View File

@ -53,7 +53,7 @@ class SynHTTPClient(HTTPClient):
return (response, signals) return (response, signals)
def send_request(self, request_obj): def send_request(self, request_obj):
"""This sends a request based on a RequestOjbect. """This sends a request based on a RequestObject.
RequestObjects are generated by a parser (e.g. RequestObjects are generated by a parser (e.g.
:class:`syntribos.clients.http.parser.RequestCreator`) from request :class:`syntribos.clients.http.parser.RequestCreator`) from request

View File

@ -17,6 +17,7 @@
# limitations under the License. # limitations under the License.
from copy import deepcopy from copy import deepcopy
import logging import logging
import threading
from time import time from time import time
import requests import requests
@ -27,6 +28,8 @@ import syntribos.checks.http as http_checks
import syntribos.signal import syntribos.signal
from syntribos.utils import string_utils from syntribos.utils import string_utils
lock = threading.Lock()
def log_http_transaction(log, level=logging.DEBUG): def log_http_transaction(log, level=logging.DEBUG):
"""Decorator used for logging requests/response in clients. """Decorator used for logging requests/response in clients.
@ -59,20 +62,13 @@ def log_http_transaction(log, level=logging.DEBUG):
sent to the request() method, to the provided log at the provided sent to the request() method, to the provided log at the provided
log level. log level.
""" """
kwargs_copy = deepcopy(kwargs) kwargs_copy = deepcopy(kwargs)
if kwargs_copy.get("sanitize"): if kwargs_copy.get("sanitize"):
kwargs_copy = string_utils.sanitize_secrets(kwargs_copy) kwargs_copy = string_utils.sanitize_secrets(kwargs_copy)
logline = '{0} {1}'.format(args, string_utils.compress( logline_obj = '{0} {1}'.format(args, string_utils.compress(
kwargs_copy)) kwargs_copy))
try:
log.debug(_safe_decode(logline))
except Exception as exception:
# Ignore all exceptions that happen in logging, then log them
log.info('Exception occurred while logging signature of '
'calling method in http client')
log.exception(exception)
# Make the request and time its execution # Make the request and time its execution
response = None response = None
no_resp_time = None no_resp_time = None
@ -132,7 +128,7 @@ def log_http_transaction(log, level=logging.DEBUG):
request_headers = string_utils.sanitize_secrets( request_headers = string_utils.sanitize_secrets(
request_headers) request_headers)
request_body = string_utils.sanitize_secrets(request_body) request_body = string_utils.sanitize_secrets(request_body)
logline = ''.join([ logline_req = ''.join([
'\n{0}\nREQUEST SENT\n{0}\n'.format('-' * 12), '\n{0}\nREQUEST SENT\n{0}\n'.format('-' * 12),
'request method.......: {0}\n'.format(response.request.method), 'request method.......: {0}\n'.format(response.request.method),
'request url..........: {0}\n'.format(string_utils.compress( 'request url..........: {0}\n'.format(string_utils.compress(
@ -145,15 +141,7 @@ def log_http_transaction(log, level=logging.DEBUG):
'request body size....: {0}\n'.format(req_body_len), 'request body size....: {0}\n'.format(req_body_len),
'request body.........: {0}\n'.format(string_utils.compress 'request body.........: {0}\n'.format(string_utils.compress
(request_body))]) (request_body))])
logline_rsp = ''.join([
try:
log.log(level, _safe_decode(logline))
except Exception as exception:
# Ignore all exceptions that happen in logging, then log them
log.log(level, '\n{0}\nREQUEST INFO\n{0}\n'.format('-' * 12))
log.exception(exception)
logline = ''.join([
'\n{0}\nRESPONSE RECEIVED\n{0}\n'.format('-' * 17), '\n{0}\nRESPONSE RECEIVED\n{0}\n'.format('-' * 17),
'response status..: {0}\n'.format(response), 'response status..: {0}\n'.format(response),
'response headers.: {0}\n'.format(response.headers), 'response headers.: {0}\n'.format(response.headers),
@ -162,12 +150,27 @@ def log_http_transaction(log, level=logging.DEBUG):
'response size....: {0}\n'.format(len(response.content)), 'response size....: {0}\n'.format(len(response.content)),
'response body....: {0}\n'.format(response_content), 'response body....: {0}\n'.format(response_content),
'-' * 79]) '-' * 79])
lock.acquire()
try: try:
log.log(level, _safe_decode(logline)) log.log(level, _safe_decode(logline_req))
except Exception as exception:
# Ignore all exceptions that happen in logging, then log them
log.log(level, '\n{0}\nREQUEST INFO\n{0}\n'.format('-' * 12))
log.exception(exception)
try:
log.log(level, _safe_decode(logline_rsp))
except Exception as exception: except Exception as exception:
# Ignore all exceptions that happen in logging, then log them # Ignore all exceptions that happen in logging, then log them
log.log(level, '\n{0}\nRESPONSE INFO\n{0}\n'.format('-' * 13)) log.log(level, '\n{0}\nRESPONSE INFO\n{0}\n'.format('-' * 13))
log.exception(exception) log.exception(exception)
try:
log.debug(_safe_decode(logline_obj))
except Exception as exception:
# Ignore all exceptions that happen in logging, then log them
log.info('Exception occurred while logging signature of '
'calling method in http client')
log.exception(exception)
lock.release()
return (response, signals) return (response, signals)
return _wrapper return _wrapper
return _decorator return _decorator

View File

@ -157,7 +157,8 @@ def list_cli_opts():
default=[""], sample_default=["SQL", "XSS"], default=[""], sample_default=["SQL", "XSS"],
help=_("Test types to be excluded from " help=_("Test types to be excluded from "
"current run against the target API")), "current run against the target API")),
cfg.BoolOpt("colorize", dest="colorize", short="cl", default=False, cfg.BoolOpt("no_colorize", dest="no_colorize", short="ncl",
default=False,
help=_("Enable color in syntribos terminal output")), help=_("Enable color in syntribos terminal output")),
cfg.StrOpt("outfile", short="o", cfg.StrOpt("outfile", short="o",
sample_default="out.json", help=_("File to print " sample_default="out.json", help=_("File to print "
@ -194,6 +195,10 @@ def list_syntribos_opts():
cfg.StrOpt("endpoint", default="", cfg.StrOpt("endpoint", default="",
sample_default="http://localhost/app", sample_default="http://localhost/app",
help=_("The target host to be tested")), help=_("The target host to be tested")),
cfg.IntOpt("threads", default=16,
sample_default="16",
help=_("Maximum number of threads syntribos spawns "
"(experimental)")),
cfg.Opt("templates", type=ContentType("r", 0), cfg.Opt("templates", type=ContentType("r", 0),
default="", default="",
sample_default="~/.syntribos/templates", sample_default="~/.syntribos/templates",

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import threading
import time import time
import unittest import unittest
@ -23,6 +24,7 @@ from syntribos.runner import Runner
import syntribos.utils.remotes import syntribos.utils.remotes
CONF = cfg.CONF CONF = cfg.CONF
lock = threading.Lock()
class IssueTestResult(unittest.TextTestResult): class IssueTestResult(unittest.TextTestResult):
@ -38,7 +40,7 @@ class IssueTestResult(unittest.TextTestResult):
"MEDIUM": 0, "MEDIUM": 0,
"HIGH": 0 "HIGH": 0
} }
stats = {"errors": 0, "failures": 0, "successes": 0} stats = {"errors": 0, "unique_failures": 0, "successes": 0}
severity_counter_dict = {} severity_counter_dict = {}
testsRunSinceLastPrint = 0 testsRunSinceLastPrint = 0
failure_id = 0 failure_id = 0
@ -84,6 +86,7 @@ class IssueTestResult(unittest.TextTestResult):
:type test: :class:`syntribos.tests.base.BaseTestCase` :type test: :class:`syntribos.tests.base.BaseTestCase`
:param tuple err: Tuple of format ``(type, value, traceback)`` :param tuple err: Tuple of format ``(type, value, traceback)``
""" """
lock.acquire()
for issue in test.failures: for issue in test.failures:
defect_type = issue.defect_type defect_type = issue.defect_type
if any([ if any([
@ -174,7 +177,7 @@ class IssueTestResult(unittest.TextTestResult):
"signals": signals "signals": signals
} }
failure_obj["instances"].append(instance_obj) failure_obj["instances"].append(instance_obj)
self.stats["failures"] += 1 self.stats["unique_failures"] += 1
self.output["stats"]["severity"][sev_rating] += 1 self.output["stats"]["severity"][sev_rating] += 1
else: else:
instance_obj = None instance_obj = None
@ -196,8 +199,9 @@ class IssueTestResult(unittest.TextTestResult):
"signals": signals "signals": signals
} }
failure_obj["instances"].append(instance_obj) failure_obj["instances"].append(instance_obj)
self.stats["failures"] += 1 self.stats["unique_failures"] += 1
self.output["stats"]["severity"][sev_rating] += 1 self.output["stats"]["severity"][sev_rating] += 1
lock.release()
def addError(self, test, err): def addError(self, test, err):
"""Duplicates parent class addError functionality. """Duplicates parent class addError functionality.
@ -207,11 +211,19 @@ class IssueTestResult(unittest.TextTestResult):
:param err: :param err:
:type tuple: Tuple of format ``(type, value, traceback)`` :type tuple: Tuple of format ``(type, value, traceback)``
""" """
self.errors.append({ with lock:
"test": self.getDescription(test), for e in self.errors:
"error": self._exc_info_to_string(err, test) if e['error'] == self._exc_info_to_string(err, test):
}) if self.getDescription(test) in e['test']:
self.stats["errors"] += 1 return
e['test'].append(self.getDescription(test))
self.stats["errors"] += 1
return
self.errors.append({
"test": [self.getDescription(test)],
"error": self._exc_info_to_string(err, test)
})
self.stats["errors"] += 1
def addSuccess(self, test): def addSuccess(self, test):
"""Duplicates parent class addSuccess functionality. """Duplicates parent class addSuccess functionality.
@ -219,7 +231,8 @@ class IssueTestResult(unittest.TextTestResult):
:param test: The test that was run :param test: The test that was run
:type test: :class:`syntribos.tests.base.BaseTestCase` :type test: :class:`syntribos.tests.base.BaseTestCase`
""" """
self.stats["successes"] += 1 with lock:
self.stats["successes"] += 1
def printErrors(self, output_format): def printErrors(self, output_format):
"""Print out each :class:`syntribos.issue.Issue` that was encountered """Print out each :class:`syntribos.issue.Issue` that was encountered
@ -241,18 +254,19 @@ class IssueTestResult(unittest.TextTestResult):
"""Print the path to the log folder for this run.""" """Print the path to the log folder for this run."""
test_log = Runner.log_path test_log = Runner.log_path
run_time = time.time() - start_time run_time = time.time() - start_time
num_fail = self.stats["failures"] num_fail = self.stats["unique_failures"]
num_err = self.stats["errors"] num_err = self.stats["errors"]
print("\n{sep}\nTotal: Ran {num} test{suff} in {time:.3f}s".format( print("\n{sep}\nTotal: Ran {num} test{suff} in {time:.3f}s".format(
sep=syntribos.SEP, sep=syntribos.SEP,
num=self.testsRun, num=self.testsRun,
suff="s" * bool(self.testsRun - 1), suff="s" * bool(self.testsRun - 1),
time=run_time)) time=run_time))
print("Total: {f} failure{fsuff} and {e} error{esuff}".format( print("Total: {f} unique failure{fsuff} "
f=num_fail, "and {e} unique error{esuff}".format(
e=num_err, f=num_fail,
fsuff="s" * bool(num_fail - 1), e=num_err,
esuff="s" * bool(num_err - 1))) fsuff="s" * bool(num_fail - 1),
esuff="s" * bool(num_err - 1)))
if test_log: if test_log:
print(syntribos.SEP) print(syntribos.SEP)
print(_("LOG PATH...: %s") % test_log) print(_("LOG PATH...: %s") % test_log)

View File

@ -13,9 +13,11 @@
# limitations under the License. # limitations under the License.
import json import json
import logging import logging
from multiprocessing.dummy import Pool as ThreadPool
import os import os
import pkgutil import pkgutil
import sys import sys
import threading
import time import time
import traceback import traceback
import unittest import unittest
@ -39,6 +41,7 @@ result = None
user_base_dir = None user_base_dir = None
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
lock = threading.Lock()
class Runner(object): class Runner(object):
@ -125,6 +128,7 @@ class Runner(object):
LOG = logging.getLogger() LOG = logging.getLogger()
LOG.handlers = [log_handle] LOG.handlers = [log_handle]
LOG.setLevel(logging.DEBUG) LOG.setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.WARNING)
return LOG return LOG
@classmethod @classmethod
@ -258,7 +262,6 @@ class Runner(object):
cls.meta_dir_dict[meta_path] = json.loads(file_content) cls.meta_dir_dict[meta_path] = json.loads(file_content)
except Exception: except Exception:
print("Unable to parse %s, skipping..." % file_path) print("Unable to parse %s, skipping..." % file_path)
for file_path, req_str in templates_dir: for file_path, req_str in templates_dir:
if "meta.json" in file_path: if "meta.json" in file_path:
continue continue
@ -331,7 +334,7 @@ class Runner(object):
if len(test_cases) > 0: if len(test_cases) > 0:
for test in test_cases: for test in test_cases:
if test: if test:
cls.run_test(test, result) cls.run_test(test)
@classmethod @classmethod
def dry_run_report(cls, output): def dry_run_report(cls, output):
@ -359,6 +362,7 @@ class Runner(object):
:return: None :return: None
""" """
pool = ThreadPool(CONF.syntribos.threads)
try: try:
template_start_time = time.time() template_start_time = time.time()
failures = 0 failures = 0
@ -367,19 +371,18 @@ class Runner(object):
for test_name, test_class in list_of_tests: for test_name, test_class in list_of_tests:
test_class.test_id = cls.current_test_id test_class.test_id = cls.current_test_id
cls.current_test_id += 5 cls.current_test_id += 5
log_string = "[{test_id}] : {name}".format(
test_id=test_class.test_id, name=test_name)
result_string = "[{test_id}] : {name}".format( result_string = "[{test_id}] : {name}".format(
test_id=cli.colorize( test_id=cli.colorize(
test_class.test_id, color="green"), test_class.test_id, color="green"),
name=test_name.replace("_", " ").capitalize()) name=test_name.replace("_", " ").capitalize())
if not CONF.colorize: if CONF.no_colorize:
result_string = result_string.ljust(55) result_string = result_string.ljust(55)
else: else:
result_string = result_string.ljust(60) result_string = result_string.ljust(60)
LOG.debug(log_string)
try: try:
test_class.send_init_request(file_path, req_str, meta_vars) test_class.create_init_request(file_path, req_str,
meta_vars)
except Exception: except Exception:
print(_( print(_(
"Error in parsing template:\n %s\n" "Error in parsing template:\n %s\n"
@ -388,40 +391,33 @@ class Runner(object):
break break
test_cases = list( test_cases = list(
test_class.get_test_cases(file_path, req_str)) test_class.get_test_cases(file_path, req_str))
if len(test_cases) > 0: total_tests = len(test_cases)
if total_tests > 0:
log_string = "[{test_id}] : {name}".format(
test_id=test_class.test_id, name=test_name)
LOG.debug(log_string)
last_failures = result.stats['unique_failures']
last_errors = result.stats['errors']
p_bar = cli.ProgressBar( p_bar = cli.ProgressBar(
message=result_string, total_len=len(test_cases)) message=result_string, total_len=total_tests)
last_failures = result.stats["failures"] test_class.send_init_request(file_path, req_str, meta_vars)
last_errors = result.stats["errors"]
for test in test_cases: # This line runs the tests
if test: pool.map(lambda t: cls.run_test(t, p_bar), test_cases)
cls.run_test(test, result)
p_bar.increment(1) failures = result.stats['unique_failures'] - last_failures
p_bar.print_bar() errors = result.stats['errors'] - last_errors
failures = result.stats["failures"] - last_failures failures_str = cli.colorize_by_percent(
errors = result.stats["errors"] - last_errors failures, total_tests, "red")
total_tests = len(test_cases)
if failures > total_tests * 0.90:
# More than 90 percent failure
failures = cli.colorize(failures, "red")
elif failures > total_tests * 0.45:
# More than 45 percent failure
failures = cli.colorize(failures, "yellow")
elif failures > total_tests * 0.15:
# More than 15 percent failure
failures = cli.colorize(failures, "blue")
if errors: if errors:
last_failures = result.stats["failures"] errors_str = cli.colorize(errors, "red")
last_errors = result.stats["errors"]
errors = cli.colorize(errors, "red")
print(_( print(_(
" : %(fail)s Failure(s), %(err)s Error(s)\r") % { " : %(fail)s Failure(s), %(err)s Error(s)\r") % {
"fail": failures, "err": errors}) "fail": failures_str, "err": errors_str})
else: else:
last_failures = result.stats["failures"] print(_(
print( " : %s Failure(s), 0 Error(s)\r") % failures_str)
_(
" : %s Failure(s), 0 Error(s)\r") % failures)
run_time = time.time() - template_start_time run_time = time.time() - template_start_time
LOG.info(_("Run time: %s sec."), run_time) LOG.info(_("Run time: %s sec."), run_time)
@ -440,26 +436,34 @@ class Runner(object):
result.print_result(cls.start_time) result.print_result(cls.start_time)
cleanup.delete_temps() cleanup.delete_temps()
print(_("Exiting...")) print(_("Exiting..."))
pool.close()
pool.join()
exit(0) exit(0)
print(_('Resuming...')) print(_('Resuming...'))
except KeyboardInterrupt: except KeyboardInterrupt:
result.print_result(cls.start_time) result.print_result(cls.start_time)
cleanup.delete_temps() cleanup.delete_temps()
print(_("Exiting...")) print(_("Exiting..."))
pool.close()
pool.join()
exit(0) exit(0)
@classmethod @classmethod
def run_test(cls, test, result): def run_test(cls, test, p_bar=None):
"""Create a new test suite, add a test, and run it """Create a new test suite, add a test, and run it
:param test: The test to add to the suite :param test: The test to add to the suite
:param result: The result object to append to :param result: The result object to append to
:type result: :class:`syntribos.result.IssueTestResult` :type result: :class:`syntribos.result.IssueTestResult`
:param bool dry_run: (OPTIONAL) Only print out test names
""" """
suite = unittest.TestSuite() if test:
suite.addTest(test("run_test_case")) suite = unittest.TestSuite()
suite.run(result) suite.addTest(test("run_test_case"))
suite.run(result)
if p_bar:
with lock:
p_bar.increment(1)
p_bar.print_bar()
def entry_point(): def entry_point():

View File

@ -148,10 +148,11 @@ class BaseTestCase(unittest.TestCase):
:param str filename: name of template file :param str filename: name of template file
:param str file_content: content of template file as string :param str file_content: content of template file as string
""" """
cls.init_req = parser.create_request( if not cls.init_req:
file_content, CONF.syntribos.endpoint, meta_vars) cls.init_req = parser.create_request(
file_content, CONF.syntribos.endpoint, meta_vars)
prepared_copy = cls.init_req.get_prepared_copy() prepared_copy = cls.init_req.get_prepared_copy()
cls.prepared_init_req = prepared_copy
cls.init_resp, cls.init_signals = cls.client.send_request( cls.init_resp, cls.init_signals = cls.client.send_request(
prepared_copy) prepared_copy)
if cls.init_resp is not None: if cls.init_resp is not None:

View File

@ -67,6 +67,9 @@ class BaseFuzzTestCase(base.BaseTestCase):
headers=cls.request.headers, headers=cls.request.headers,
params=cls.request.params, params=cls.request.params,
data=cls.request.data) data=cls.request.data)
if not hasattr(cls.request, 'body'):
cls.request.body = cls.request.data
cls.test_req = cls.request cls.test_req = cls.request
if cls.test_resp is None or "EXCEPTION_RAISED" in cls.test_signals: if cls.test_resp is None or "EXCEPTION_RAISED" in cls.test_signals:
@ -88,7 +91,6 @@ class BaseFuzzTestCase(base.BaseTestCase):
self.run_default_checks() in order to test for the Issues self.run_default_checks() in order to test for the Issues
defined here defined here
""" """
if "HTTP_STATUS_CODE_5XX" in self.test_signals: if "HTTP_STATUS_CODE_5XX" in self.test_signals:
self.register_issue( self.register_issue(
defect_type="500_errors", defect_type="500_errors",
@ -125,9 +127,6 @@ class BaseFuzzTestCase(base.BaseTestCase):
def get_test_cases(cls, filename, file_content): def get_test_cases(cls, filename, file_content):
"""Generates new TestCases for each fuzz string """Generates new TestCases for each fuzz string
First, sends a baseline (non-fuzzed) request, storing it in
cls.init_resp.
For each string returned by cls._get_strings(), yield a TestCase class For each string returned by cls._get_strings(), yield a TestCase class
for the string as an extension to the current TestCase class. Every for the string as an extension to the current TestCase class. Every
string used as a fuzz test payload entails the generation of a new string used as a fuzz test payload entails the generation of a new
@ -197,8 +196,7 @@ class BaseFuzzTestCase(base.BaseTestCase):
issue.request = self.test_req issue.request = self.test_req
issue.response = self.test_resp issue.response = self.test_resp
issue.test_type = self.test_name issue.test_type = self.test_name
prepared_copy = self.init_req.get_prepared_copy() url_components = urlparse(self.prepared_init_req.url)
url_components = urlparse(prepared_copy.url)
issue.target = url_components.netloc issue.target = url_components.netloc
issue.path = url_components.path issue.path = url_components.path
issue.init_signals = self.init_signals issue.init_signals = self.init_signals

View File

@ -34,7 +34,6 @@ class BufferOverflowBody(base_fuzz.BaseFuzzTestCase):
return [ return [
"A" * (2 ** 16 + 1), "A" * (2 ** 16 + 1),
"a" * 10 ** 5, "a" * 10 ** 5,
"a" * 10 ** 6,
'\x00' * (2 ** 16 + 1), '\x00' * (2 ** 16 + 1),
"%%s" * 513, "%%s" * 513,
] ]

View File

@ -26,22 +26,6 @@ CONF = cfg.CONF
def print_symbol(): def print_symbol():
"""Syntribos radiation symbol.""" """Syntribos radiation symbol."""
symbol = """ Syntribos symbol = """ Syntribos
xxxxxxx
x xxxxxxxxxxxxx x
x xxxxxxxxxxx x
xxxxxxxxx
x xxxxxxx x
xxxxx
x xxx x
x
xxxxxxxxxxxxxxx xxxxxxxxxxxxxxx
xxxxxxxxxxxxx xxxxxxxxxxxxx
xxxxxxxxxxx xxxxxxxxxxx
xxxxxxxxx xxxxxxxxx
xxxxxx xxxxxx
xxx xxx
x x
x
=== Automated API Scanning ===""" === Automated API Scanning ==="""
print(syntribos.SEP) print(syntribos.SEP)
print(symbol) print(symbol)
@ -55,15 +39,24 @@ def colorize(string, color="nocolor"):
colors = dict(list(zip(color_names, list(range(31, 35))))) colors = dict(list(zip(color_names, list(range(31, 35)))))
colors["nocolor"] = 0 # No Color colors["nocolor"] = 0 # No Color
if not CONF.colorize: if CONF.no_colorize:
return string return string
return "\033[0;{color}m{string}\033[0;m".format(string=string, return "\033[0;{color}m{string}\033[0;m".format(string=string,
color=colors.setdefault( color=colors.setdefault(
color, 0)) color, 0))
def colorize_by_percent(amount, total, high=0.5, medium=0):
if amount > total * high:
return colorize(amount, "red")
elif amount > total * medium:
return colorize(amount, "yellow")
else:
return str(amount)
class ProgressBar(object): class ProgressBar(object):
"""A simple progressBar. """A simple progressBar. Written as a singleton.
A simple generic progress bar like many others. A simple generic progress bar like many others.
:param int total_len: total_len value, when progress is 100 % :param int total_len: total_len value, when progress is 100 %

View File

@ -64,7 +64,7 @@ class ConfFixture(config_fixture.Config):
"""config values for CLI options(default group).""" """config values for CLI options(default group)."""
# TODO(unrahul): Add mock file path for outfile # TODO(unrahul): Add mock file path for outfile
self.conf.set_default("test_types", [""]) self.conf.set_default("test_types", [""])
self.conf.set_default("colorize", False) self.conf.set_default("no_colorize", True)
self.conf.set_default("output_format", "json") self.conf.set_default("output_format", "json")
self.conf.set_default("min_severity", "LOW") self.conf.set_default("min_severity", "LOW")
self.conf.set_default("min_confidence", "LOW") self.conf.set_default("min_confidence", "LOW")

View File

@ -20,7 +20,7 @@ from syntribos.utils.cli import CONF
class TestColorize(testtools.TestCase): class TestColorize(testtools.TestCase):
def test_colorize(self): def test_colorize(self):
CONF.colorize = True CONF.no_colorize = False
string = "color this string" string = "color this string"
colors = {"red": 31, colors = {"red": 31,
"green": 32, "green": 32,
@ -34,6 +34,6 @@ class TestColorize(testtools.TestCase):
colorize(string, color)) colorize(string, color))
def test_no_colorize(self): def test_no_colorize(self):
CONF.colorize = False CONF.no_colorize = True
string = "No color" string = "No color"
self.assertEqual(string, colorize(string)) self.assertEqual(string, colorize(string))

View File

@ -52,7 +52,7 @@ class TestIssueTestResult(testtools.TestCase):
def test_addFailure(self): def test_addFailure(self):
test = FakeTest("failure") test = FakeTest("failure")
self.issue_result.addFailure(test, ()) self.issue_result.addFailure(test, ())
self.assertEqual(self.issue_result.stats["failures"], 2) self.assertEqual(self.issue_result.stats["unique_failures"], 2)
def test_addSuccess(self): def test_addSuccess(self):
test = FakeTest("success") test = FakeTest("success")