diff --git a/README.md b/README.md index acf961cc..2c042309 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,53 @@ To write a test: vulnerability might present itself and extend the example file and the test function accordingly. +Extending Bandit +---------------- + +Bandit allows users to write and register extensions for checks and formatters. +Bandit will load plugins from two entry-points: + +- `bandit.formatters` +- `bandit.plugins` + +Formatters need to accept 4 things: + +- `result_store`: An instance of `bandit.core.BanditResultStore` +- `file_list`: The list of files which were inspected in the scope +- `scores`: The scores awarded to each file in the scope +- `excluded_files`: The list of files that were excluded from the scope + +Plugins tend to take advantage of the `bandit.checks` decorator which allows +the author to register a check for a particular type of AST node. For example, + + @bandit.checks('Call') + def prohibit_unsafe_deserialization(context): + if 'unsafe_load' in context.call_function_name_qual: + return bandit.Issue( + severity=bandit.HIGH, + confidence=bandit.HIGH, + text="Unsafe deserialization detected." + ) + +To register your plugin, you have two options: + +1. If you're using setuptools directly, add something like the following to + your `setup` call: + + # If you have an imaginary bson formatter in the bandit_bson module + # and a function called `formatter`. + entry_points={'bandit.formatters': ['bson = bandit_bson:formatter']} + # Or a check for using mako templates in bandit_mako that + entry_points={'bandit.plugins': ['mako = bandit_mako']} + +2. If you're using pbr, add something like the following to your `setup.cfg` + file: + + [entry_points] + bandit.formatters = + bson = bandit_bson:formatter + bandit.plugins = + mako = bandit_mako Contributing ------------ diff --git a/bandit/bandit.py b/bandit/bandit.py index f55f1dee..bc1149f2 100755 --- a/bandit/bandit.py +++ b/bandit/bandit.py @@ -20,12 +20,14 @@ import logging import os import sys +from bandit.core import extension_loader as ext_loader from bandit.core import manager as b_manager default_test_config = 'bandit.yaml' def main(): + extension_mgr = ext_loader.MANAGER parser = argparse.ArgumentParser( description='Bandit - a Python source code analyzer.' ) @@ -66,7 +68,7 @@ def main(): parser.add_argument( '-f', '--format', dest='output_format', action='store', default='txt', help='specify output format', - choices=['txt', 'json', 'csv', 'xml'] + choices=sorted(extension_mgr.formatter_names) ) parser.add_argument( '-o', '--output', dest='output_file', action='store', @@ -83,6 +85,10 @@ def main(): parser.set_defaults(debug=False) parser.set_defaults(verbose=False) + parser.epilog = ('The following plugin suites were discovered and' + ' loaded: [' + + ', '.join(extension_mgr.plugin_names) + ']') + # setup work - parse arguments, and initialize BanditManager args = parser.parse_args() config_file = args.config_file diff --git a/bandit/core/extension_loader.py b/bandit/core/extension_loader.py new file mode 100644 index 00000000..b4374190 --- /dev/null +++ b/bandit/core/extension_loader.py @@ -0,0 +1,53 @@ +# -*- coding:utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from stevedore import extension + + +class Manager(object): + def __init__(self, formatters_namespace='bandit.formatters', + plugins_namespace='bandit.plugins'): + # Cache the extension managers, loaded extensions, and extension names + self.load_formatters(formatters_namespace) + self.formatters = list(self.formatters_mgr) + self.formatter_names = self.formatters_mgr.names() + + self.load_plugins(plugins_namespace) + self.plugins = list(self.plugins_mgr) + self.plugin_names = self.plugins_mgr.names() + + def load_formatters(self, formatters_namespace): + self.formatters_mgr = extension.ExtensionManager( + namespace=formatters_namespace, + # We don't want to call the formatter when we load it. + invoke_on_load=False, + # We don't care if the extension doesn't have the dependencies it + # needs to start up. + verify_requirements=False, + ) + + def load_plugins(self, plugins_namespace): + # See comments in load_formatters for parameter explanations + self.plugins_mgr = extension.ExtensionManager( + namespace=plugins_namespace, + invoke_on_load=False, + verify_requirements=False, + ) + + +# Using entry-points and pkg_resources *can* be expensive. So let's load these +# once, store them on the object, and have a module global object for +# accessing them. After the first time this module is imported, it should save +# this attribute on the module and not have to reload the entry-points. +MANAGER = Manager() diff --git a/bandit/core/formatters.py b/bandit/core/formatters.py new file mode 100644 index 00000000..fd21adaf --- /dev/null +++ b/bandit/core/formatters.py @@ -0,0 +1,248 @@ +# -*- coding:utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import collections +import csv +import datetime +import json +from operator import itemgetter + +import six + +from bandit.core import constants + + +def report_csv(result_store, file_list, scores, excluded_files): + '''Prints/returns warnings in JSON format + + :param files_list: Which files were inspected + :param scores: The scores awarded to each file in the scope + :param excluded_files: Which files were excluded from the scope + :return: A collection containing the CSV data + ''' + + results = result_store._get_issue_list() + + # Remove the code from all the issues in the list, as we will not + # be including it in the CSV data. + def del_code(issue): + del issue['code'] + map(del_code, results) + + if result_store.out_file is None: + result_store.out_file = 'bandit_results.csv' + + with open(result_store.out_file, 'w') as fout: + fieldnames = ['filename', + 'test_name', + 'issue_severity', + 'issue_confidence', + 'issue_text', + 'line_number', + 'line_range'] + + writer = csv.DictWriter(fout, fieldnames=fieldnames, + extrasaction='ignore') + writer.writeheader() + for result in results: + writer.writerow(result) + + print("CSV output written to file: %s" % result_store.out_file) + + +def report_json(result_store, file_list, scores, excluded_files): + '''Prints/returns warnings in JSON format + + :param files_list: Which files were inspected + :param scores: The scores awarded to each file in the scope + :param excluded_files: Which files were excluded from the scope + :return: JSON string + ''' + + stats = dict(zip(file_list, scores)) + + machine_output = dict({'results': [], 'errors': [], 'stats': []}) + collector = list() + for (fname, reason) in result_store.skipped: + machine_output['errors'].append({'filename': fname, + 'reason': reason}) + + for filer, score in six.iteritems(stats): + totals = {} + for i in range(result_store.level, len(constants.RANKING)): + severity = constants.RANKING[i] + severity_value = constants.RANKING_VALUES[severity] + try: + sc = score['SEVERITY'][i] / severity_value + except ZeroDivisionError: + sc = 0 + totals[severity] = sc + + machine_output['stats'].append({ + 'filename': filer, + 'score': result_store._sum_scores(score), + 'issue totals': totals}) + + collector = result_store._get_issue_list() + + if result_store.agg_type == 'vuln': + machine_output['results'] = sorted(collector, + key=itemgetter('error_type')) + else: + machine_output['results'] = sorted(collector, + key=itemgetter('filename')) + + result = json.dumps(machine_output, sort_keys=True, + indent=2, separators=(',', ': ')) + + if result_store.out_file: + with open(result_store.out_file, 'w') as fout: + fout.write(result) + # XXX: Should this be log output? (ukbelch) + print("JSON output written to file: %s" % result_store.out_file) + else: + print(result) + + +def report_text(result_store, files_list, scores, excluded_files): + '''Prints the contents of the result store + + :param files_list: Which files were inspected + :param scores: The scores awarded to each file in the scope + :param excluded_files: List of files excluded from the scope + :return: TXT string with appropriate TTY coloring for terminals + ''' + + tmpstr_list = [] + + # use a defaultdict to default to an empty string + color = collections.defaultdict(str) + + if result_store.format == 'txt': + # get text colors from settings for TTY output + get_setting = result_store.config.get_setting + color = {'HEADER': get_setting('color_HEADER'), + 'DEFAULT': get_setting('color_DEFAULT'), + 'LOW': get_setting('color_LOW'), + 'MEDIUM': get_setting('color_MEDIUM'), + 'HIGH': get_setting('color_HIGH') + } + + # print header + tmpstr_list.append("%sRun started:%s\n\t%s\n" % ( + color['HEADER'], + color['DEFAULT'], + datetime.datetime.utcnow() + )) + + if result_store.verbose: + # print which files were inspected + tmpstr_list.append("\n%sFiles in scope (%s):%s\n" % ( + color['HEADER'], len(files_list), + color['DEFAULT'] + )) + + for item in zip(files_list, map(result_store._sum_scores, scores)): + tmpstr_list.append("\t%s (score: %i)\n" % item) + + # print which files were excluded and why + tmpstr_list.append("\n%sFiles excluded (%s):%s\n" % + (color['HEADER'], len(excluded_files), + color['DEFAULT'])) + for fname in excluded_files: + tmpstr_list.append("\t%s\n" % fname) + + # print which files were skipped and why + tmpstr_list.append("\n%sFiles skipped (%s):%s\n" % ( + color['HEADER'], len(result_store.skipped), + color['DEFAULT'] + )) + + for (fname, reason) in result_store.skipped: + tmpstr_list.append("\t%s (%s)\n" % (fname, reason)) + + # print the results + tmpstr_list.append("\n%sTest results:%s\n" % ( + color['HEADER'], color['DEFAULT'] + )) + + if result_store.count == 0: + tmpstr_list.append("\tNo issues identified.\n") + + for filename, issues in result_store.resstore.items(): + for issue in issues: + + # if the result isn't filtered out by severity + if result_store._check_severity(issue['issue_severity']): + tmpstr_list.append("\n%s>> Issue: %s\n" % ( + color.get(issue['issue_severity'], color['DEFAULT']), + issue['issue_text'] + )) + tmpstr_list.append(" Severity: %s Confidence: %s\n" % ( + issue['issue_severity'].capitalize(), + issue['issue_confidence'].capitalize() + )) + tmpstr_list.append(" Location: %s:%s\n" % ( + issue['fname'], + issue['lineno'] + )) + tmpstr_list.append(color['DEFAULT']) + + tmpstr_list.append( + result_store._get_code(issue, True)) + + result = ''.join(tmpstr_list) + + if result_store.out_file: + with open(result_store.out_file, 'w') as fout: + fout.write(result) + result_store.logger.info("Text output written to file: %s", + result_store.out_file) + else: + print(result) + + +def report_xml(result_store, file_list, scores, excluded_files): + '''Prints/returns warnings in XML format (Xunit compatible) + + :param files_list: Which files were inspected + :param scores: The scores awarded to each file in the scope + :param excluded_files: Which files were excluded from the scope + :return: A collection containing the XML data + ''' + + import xml.etree.cElementTree as ET + + if result_store.out_file is None: + result_store.out_file = 'bandit_results.xml' + + items = result_store.resstore.items() + root = ET.Element('testsuite', name='bandit', tests=str(len(items))) + for filename, issues in items: + for issue in issues: + test = issue['test'] + testcase = ET.SubElement(root, 'testcase', + classname=filename, name=test) + if result_store._check_severity(issue['issue_severity']): + text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s' + text = text % ( + issue['issue_severity'], issue['issue_confidence'], + issue['issue_text'], issue['fname'], issue['lineno']) + ET.SubElement(testcase, 'error', + type=issue['issue_severity'], + message=issue['issue_text']).text = text + + tree = ET.ElementTree(root) + tree.write(result_store.out_file, encoding='utf-8', xml_declaration=True) + + print("XML output written to file: %s" % result_store.out_file) diff --git a/requirements.txt b/requirements.txt index af605add..3c8725b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ PyYAML>=3.1.0 six>=1.9.0 +stevedore>=1.5.0 # Apache 2.0 diff --git a/setup.cfg b/setup.cfg index f85af38b..e99df33e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,11 @@ classifier = [entry_points] console_scripts = bandit = bandit.bandit:main +bandit.formatters = + csv = bandit.core.formatters:report_csv + json = bandit.core.formatters:report_json + txt = bandit.core.formatters:report_text + xml = bandit.core.formatters:report_xml [files] package_data =