Add extension entry-points and loading

This allows Bandit to be extended by third-party packages with both
plugins and formatters. It also updates Bandit's existing in-tree
formatters to be loaded by the plugin manager. When running

    $ bandit -h

The loaded plugins will be displayed to the user if any are installed.

Change-Id: I102277dcd9481f2573028a436e910eda10011d91
This commit is contained in:
Ian Cordasco 2015-06-05 22:23:34 -05:00
parent fe0bc8977d
commit 1c4d8dfd40
6 changed files with 361 additions and 1 deletions

View File

@ -162,6 +162,53 @@ To write a test:
vulnerability might present itself and extend the example file and the test
function accordingly.
Extending Bandit
Bandit allows users to write and register extensions for checks and formatters.
Bandit will load plugins from two entry-points:
- `bandit.formatters`
- `bandit.plugins`
Formatters need to accept 4 things:
- `result_store`: An instance of `bandit.core.BanditResultStore`
- `file_list`: The list of files which were inspected in the scope
- `scores`: The scores awarded to each file in the scope
- `excluded_files`: The list of files that were excluded from the scope
Plugins tend to take advantage of the `bandit.checks` decorator which allows
the author to register a check for a particular type of AST node. For example,
def prohibit_unsafe_deserialization(context):
if 'unsafe_load' in context.call_function_name_qual:
return bandit.Issue(
text="Unsafe deserialization detected."
To register your plugin, you have two options:
1. If you're using setuptools directly, add something like the following to
your `setup` call:
# If you have an imaginary bson formatter in the bandit_bson module
# and a function called `formatter`.
entry_points={'bandit.formatters': ['bson = bandit_bson:formatter']}
# Or a check for using mako templates in bandit_mako that
entry_points={'bandit.plugins': ['mako = bandit_mako']}
2. If you're using pbr, add something like the following to your `setup.cfg`
bandit.formatters =
bson = bandit_bson:formatter
bandit.plugins =
mako = bandit_mako

View File

@ -20,12 +20,14 @@ import logging
import os
import sys
from bandit.core import extension_loader as ext_loader
from bandit.core import manager as b_manager
default_test_config = 'bandit.yaml'
def main():
extension_mgr = ext_loader.MANAGER
parser = argparse.ArgumentParser(
description='Bandit - a Python source code analyzer.'
@ -66,7 +68,7 @@ def main():
'-f', '--format', dest='output_format', action='store',
default='txt', help='specify output format',
choices=['txt', 'json', 'csv', 'xml']
'-o', '--output', dest='output_file', action='store',
@ -83,6 +85,10 @@ def main():
parser.epilog = ('The following plugin suites were discovered and'
' loaded: [' +
', '.join(extension_mgr.plugin_names) + ']')
# setup work - parse arguments, and initialize BanditManager
args = parser.parse_args()
config_file = args.config_file

View File

@ -0,0 +1,53 @@
# -*- coding:utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import extension
class Manager(object):
def __init__(self, formatters_namespace='bandit.formatters',
# Cache the extension managers, loaded extensions, and extension names
self.formatters = list(self.formatters_mgr)
self.formatter_names = self.formatters_mgr.names()
self.plugins = list(self.plugins_mgr)
self.plugin_names = self.plugins_mgr.names()
def load_formatters(self, formatters_namespace):
self.formatters_mgr = extension.ExtensionManager(
# We don't want to call the formatter when we load it.
# We don't care if the extension doesn't have the dependencies it
# needs to start up.
def load_plugins(self, plugins_namespace):
# See comments in load_formatters for parameter explanations
self.plugins_mgr = extension.ExtensionManager(
# Using entry-points and pkg_resources *can* be expensive. So let's load these
# once, store them on the object, and have a module global object for
# accessing them. After the first time this module is imported, it should save
# this attribute on the module and not have to reload the entry-points.
MANAGER = Manager()

bandit/core/ Normal file
View File

@ -0,0 +1,248 @@
# -*- coding:utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import csv
import datetime
import json
from operator import itemgetter
import six
from bandit.core import constants
def report_csv(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in JSON format
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: A collection containing the CSV data
results = result_store._get_issue_list()
# Remove the code from all the issues in the list, as we will not
# be including it in the CSV data.
def del_code(issue):
del issue['code']
map(del_code, results)
if result_store.out_file is None:
result_store.out_file = 'bandit_results.csv'
with open(result_store.out_file, 'w') as fout:
fieldnames = ['filename',
writer = csv.DictWriter(fout, fieldnames=fieldnames,
for result in results:
print("CSV output written to file: %s" % result_store.out_file)
def report_json(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in JSON format
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: JSON string
stats = dict(zip(file_list, scores))
machine_output = dict({'results': [], 'errors': [], 'stats': []})
collector = list()
for (fname, reason) in result_store.skipped:
machine_output['errors'].append({'filename': fname,
'reason': reason})
for filer, score in six.iteritems(stats):
totals = {}
for i in range(result_store.level, len(constants.RANKING)):
severity = constants.RANKING[i]
severity_value = constants.RANKING_VALUES[severity]
sc = score['SEVERITY'][i] / severity_value
except ZeroDivisionError:
sc = 0
totals[severity] = sc
'filename': filer,
'score': result_store._sum_scores(score),
'issue totals': totals})
collector = result_store._get_issue_list()
if result_store.agg_type == 'vuln':
machine_output['results'] = sorted(collector,
machine_output['results'] = sorted(collector,
result = json.dumps(machine_output, sort_keys=True,
indent=2, separators=(',', ': '))
if result_store.out_file:
with open(result_store.out_file, 'w') as fout:
# XXX: Should this be log output? (ukbelch)
print("JSON output written to file: %s" % result_store.out_file)
def report_text(result_store, files_list, scores, excluded_files):
'''Prints the contents of the result store
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: List of files excluded from the scope
:return: TXT string with appropriate TTY coloring for terminals
tmpstr_list = []
# use a defaultdict to default to an empty string
color = collections.defaultdict(str)
if result_store.format == 'txt':
# get text colors from settings for TTY output
get_setting = result_store.config.get_setting
color = {'HEADER': get_setting('color_HEADER'),
'DEFAULT': get_setting('color_DEFAULT'),
'LOW': get_setting('color_LOW'),
'MEDIUM': get_setting('color_MEDIUM'),
'HIGH': get_setting('color_HIGH')
# print header
tmpstr_list.append("%sRun started:%s\n\t%s\n" % (
if result_store.verbose:
# print which files were inspected
tmpstr_list.append("\n%sFiles in scope (%s):%s\n" % (
color['HEADER'], len(files_list),
for item in zip(files_list, map(result_store._sum_scores, scores)):
tmpstr_list.append("\t%s (score: %i)\n" % item)
# print which files were excluded and why
tmpstr_list.append("\n%sFiles excluded (%s):%s\n" %
(color['HEADER'], len(excluded_files),
for fname in excluded_files:
tmpstr_list.append("\t%s\n" % fname)
# print which files were skipped and why
tmpstr_list.append("\n%sFiles skipped (%s):%s\n" % (
color['HEADER'], len(result_store.skipped),
for (fname, reason) in result_store.skipped:
tmpstr_list.append("\t%s (%s)\n" % (fname, reason))
# print the results
tmpstr_list.append("\n%sTest results:%s\n" % (
color['HEADER'], color['DEFAULT']
if result_store.count == 0:
tmpstr_list.append("\tNo issues identified.\n")
for filename, issues in result_store.resstore.items():
for issue in issues:
# if the result isn't filtered out by severity
if result_store._check_severity(issue['issue_severity']):
tmpstr_list.append("\n%s>> Issue: %s\n" % (
color.get(issue['issue_severity'], color['DEFAULT']),
tmpstr_list.append(" Severity: %s Confidence: %s\n" % (
tmpstr_list.append(" Location: %s:%s\n" % (
result_store._get_code(issue, True))
result = ''.join(tmpstr_list)
if result_store.out_file:
with open(result_store.out_file, 'w') as fout:
fout.write(result)"Text output written to file: %s",
def report_xml(result_store, file_list, scores, excluded_files):
'''Prints/returns warnings in XML format (Xunit compatible)
:param files_list: Which files were inspected
:param scores: The scores awarded to each file in the scope
:param excluded_files: Which files were excluded from the scope
:return: A collection containing the XML data
import xml.etree.cElementTree as ET
if result_store.out_file is None:
result_store.out_file = 'bandit_results.xml'
items = result_store.resstore.items()
root = ET.Element('testsuite', name='bandit', tests=str(len(items)))
for filename, issues in items:
for issue in issues:
test = issue['test']
testcase = ET.SubElement(root, 'testcase',
classname=filename, name=test)
if result_store._check_severity(issue['issue_severity']):
text = 'Severity: %s Confidence: %s\n%s\nLocation %s:%s'
text = text % (
issue['issue_severity'], issue['issue_confidence'],
issue['issue_text'], issue['fname'], issue['lineno'])
ET.SubElement(testcase, 'error',
message=issue['issue_text']).text = text
tree = ET.ElementTree(root)
tree.write(result_store.out_file, encoding='utf-8', xml_declaration=True)
print("XML output written to file: %s" % result_store.out_file)

View File

@ -1,2 +1,3 @@
stevedore>=1.5.0 # Apache 2.0

View File

@ -23,6 +23,11 @@ classifier =
console_scripts =
bandit = bandit.bandit:main
bandit.formatters =
csv = bandit.core.formatters:report_csv
json = bandit.core.formatters:report_json
txt = bandit.core.formatters:report_text
xml = bandit.core.formatters:report_xml
package_data =