Address static analysis issues

This patch is meant to address issues found by running the bandit
static analysis tool. Some of the issues are valid vulnerabilities,
while others are false positives. For false positives, the 'nosec'
keyword has been added to allow bandit checks to pass.

Change-Id: Iaa3375f5031e7b86f3d0d54c27cf8f8fc30c90a4
(cherry picked from commit c386d4167c)
(cherry picked from commit 1abd42d30a)
This commit is contained in:
Thomas Bachman 2024-05-23 12:13:06 +00:00
parent d31192da5b
commit f72bd925d8
7 changed files with 62 additions and 51 deletions

View File

@ -14,6 +14,8 @@ import os
from django.conf import settings
from django.urls import reverse
from django.utils.html import format_html
from django.utils.html import format_html_join
from django.utils.safestring import mark_safe
from gbpui import client
@ -29,12 +31,12 @@ def update_pruleset_attributes(request, prset):
rules = prset.policy_rules
url = "horizon:project:application_policy:policyruledetails"
value = ["<ul>"]
li = lambda x: "<li><a href='" + \
reverse(url, kwargs={'policyrule_id': x.id}) + \
"'>" + x.name + "</a></li>"
for rule in rules:
r = client.policyrule_get(request, rule)
value.append(li(r))
li = format_html("<li><a href='{}'>{}</a></li>",
reverse(url, kwargs={'policyrule_id': r.id}),
r.name)
value.append(li)
value.append("</ul>")
value = "".join(value)
setattr(prset, 'policy_rules', mark_safe(value))
@ -47,10 +49,10 @@ def update_service_policy_attributes(policy):
if len(np) > 0:
tags = []
for item in np:
dl = ["<dl class='dl-horizontal'>"]
dl.extend(["<dt>%s<dt><dd>%s</dd>" %
(k, v) for k, v in list(item.items())])
dl.append("</dl>")
dl = [mark_safe("<dl class='dl-horizontal'>")]
dl.extend(format_html_join('', "<dt>{}<dt><dd>{}</dd>",
((k, v) for k, v in list(item.items()))))
dl.append(mark_safe("</dl>"))
tags.append("".join(dl))
params = mark_safe("".join(tags))
setattr(policy, 'network_service_params', params)
@ -64,16 +66,15 @@ def update_policy_target_attributes(request, pt):
provided = [client.policy_rule_set_get(request, item) for item in provided]
consumed = [client.policy_rule_set_get(request, item) for item in consumed]
p = ["<ul>"]
li = lambda x: "<li><a href='" + \
reverse(url, kwargs={'policy_rule_set_id': x.id}) + \
"'>" + x.name + "</a></li>"
li = lambda url, item: (format_html("<li><a href='{}'>{}</a></li>",
reverse(url, kwargs={'policy_rule_set_id': item.id}), item.name))
for item in provided:
p.append(li(item))
p.append(li(url, item))
p.append("</ul>")
p = "".join(p)
c = ["<ul>"]
for item in consumed:
c.append(li(item))
c.append(li(url, item))
c.append("</ul>")
c = "".join(c)
consumed = [item.name for item in consumed]
@ -83,15 +84,14 @@ def update_policy_target_attributes(request, pt):
if hasattr(pt, 'l2_policy_id') and pt.l2_policy_id is not None:
policy = client.l2policy_get(request, pt.l2_policy_id)
u = reverse(l2url, kwargs={'l2policy_id': policy.id})
atag = mark_safe(
"<a href='" + u + "'>" + policy.name + "</a>")
atag = format_html("<a href='{}'>{}</a>", u, policy.name)
setattr(pt, 'l2_policy_id', atag)
if hasattr(pt, 'external_segments'):
exturl = "horizon:project:network_policy:external_connectivity_details"
value = ["<ul>"]
li = lambda x: "<li><a href='" + \
reverse(exturl, kwargs={'external_connectivity_id': x.id}) + \
"'>" + x.name + "</a></li>"
li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
reverse(exturl, kwargs={'external_connectivity_id': x.id}),
x.name)
for external_segment in pt.external_segments:
ext_policy = client.get_externalconnectivity(request,
external_segment)
@ -107,22 +107,22 @@ def update_policyrule_attributes(request, prule):
classifier_id = prule.policy_classifier_id
classifier = client.policyclassifier_get(request, classifier_id)
u = reverse(url, kwargs={'policyclassifier_id': classifier.id})
tag = mark_safe("<a href='" + u + "'>" + classifier.name + "</a>")
tag = format_html("<a href='{}'>{}</a>", u, classifier.name)
setattr(prule, 'policy_classifier_id', tag)
actions = prule.policy_actions
action_url = "horizon:project:application_policy:policyactiondetails"
ul = ["<ul>"]
ul = [mark_safe("<ul>")]
for a in actions:
action = client.policyaction_get(request, a)
u = reverse(action_url, kwargs={'policyaction_id': a})
if action.action_type == 'redirect':
spec = client.get_servicechain_spec(request, action.action_value)
spec_details = "%s:%s" % (spec.name, str(spec.id))
li = "<li><a href='%s'>%s</a></li>" % (u, spec_details)
li = format_html("<li><a href='%s'>%s</a></li>", u, spec_details)
else:
li = "<li><a href='%s'>%s</a></li>" % (u, action.name)
li = format_html("<li><a href='%s'>%s</a></li>", u, action.name)
ul.append(li)
ul.append("</ul>")
ul.append(mark_safe("</ul>"))
ultag = "".join(ul)
setattr(prule, 'policy_actions', mark_safe(ultag))
return prule
@ -292,17 +292,15 @@ def update_classifier_attributes(classifiers):
def update_l3_policy_attributes(request, l3_policy):
url = "horizon:project:network_policy:external_connectivity_details"
if bool(l3_policy.external_segments):
value = ["<ul>"]
li = \
lambda x: "<li><a href='" + \
reverse(url, kwargs={'external_connectivity_id': x.id}) + \
"'>" + x.name + "</a>" + " : " + \
l3_policy.external_segments[x.id][0] + "</li>"
value = [mark_safe("<ul>")]
li = lambda x: format_html("<li><a href='{}'>{}</a> : {}</li>",
reverse(url, kwargs={'external_connectivity_id': x.id}),
x.name, l3_policy.external_segments[x.id][0])
for ec in list(l3_policy.external_segments.keys()):
external_connectivity = client.get_externalconnectivity(request,
ec)
value.append(li(external_connectivity))
value.append("</ul>")
value.append(mark_safe("</ul>"))
tag = mark_safe("".join(value))
else:
tag = '-'
@ -313,15 +311,13 @@ def update_l3_policy_attributes(request, l3_policy):
def update_nat_pool_attributes(request, nat_pool):
url = "horizon:project:network_policy:external_connectivity_details"
id = nat_pool.external_segment_id
value = ["<ul>"]
li = \
lambda x: "<li><a href='" + \
reverse(url, kwargs={'external_connectivity_id': x.id}) + \
"'>" + x.name + "</a>" + "</li>"
value = [mark_safe("<ul>")]
li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
reverse(url, kwargs={'external_connectivity_id': x.id}), x.name)
external_connectivity = client.get_externalconnectivity(request,
id)
value.append(li(external_connectivity))
value.append("</ul>")
value.append(mark_safe("</ul>"))
tag = mark_safe("".join(value))
setattr(nat_pool, 'external_segment_id', tag)
return nat_pool

View File

@ -88,11 +88,11 @@ class DropdownEditWidget(TextInput):
def render(self, name, value, attrs=None):
text_html = super(DropdownEditWidget, self).render(
name, value, attrs=attrs)
data_list = '<datalist id="list__%s">' % self._name
data_list = [format_html('<datalist id="list__{}">', self._name)]
for item in self._list:
data_list += '<option value="%s">' % item
data_list += '</datalist>'
return mark_safe(text_html + data_list)
data_list.append(format_html('<option value="{}">', item))
data_list.append(mark_safe('</datalist>'))
return mark_safe(text_html + mark_safe("".join(data_list)))
class TransferTableWidget(widgets.SelectMultiple):
@ -134,7 +134,7 @@ class TransferTableWidget(widgets.SelectMultiple):
open_tag = format_html('<d-table {}>', flatatt(final_attrs))
output = [open_tag, options, '</d-table>']
output = [open_tag, options, mark_safe('</d-table>')]
return mark_safe('\n'.join(output))

View File

@ -167,7 +167,9 @@ class UpdatePolicyActionForm(BaseUpdateForm):
self.fields['name'].initial = pa.name
self.fields['description'].initial = pa.description
self.fields['shared'].initial = pa.shared
except Exception:
except KeyError:
pass
except AttributeError:
pass
def handle(self, request, context):

View File

@ -460,7 +460,9 @@ class AddConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [
(p.id, p.name) for p in items
if p.id not in consumedpolicy_rule_sets]
except Exception:
except AttributeError:
pass
except KeyError:
pass
self.fields['policy_rule_set'].choices = policy_rule_sets
@ -508,7 +510,9 @@ class ExtAddConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [
(p.id, p.name) for p in items
if p.id not in consumedpolicy_rule_sets]
except Exception:
except AttributeError:
pass
except KeyError:
pass
self.fields['policy_rule_set'].choices = policy_rule_sets
@ -557,7 +561,9 @@ class RemoveConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [(p.id, p.name)
for p in items if p.id
in consumedpolicy_rule_sets]
except Exception:
except AttributeError:
pass
except KeyError:
pass
self.fields['policy_rule_set'].choices = policy_rule_sets
@ -605,7 +611,9 @@ class ExtRemoveConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [(p.id, p.name)
for p in items if p.id
in consumedpolicy_rule_sets]
except Exception:
except AttributeError:
pass
except KeyError:
pass
self.fields['policy_rule_set'].choices = policy_rule_sets

View File

@ -66,7 +66,9 @@ class PTGDetailsView(tabs.TabbedTableView):
policy_target = client.policy_target_get(
self.request, context['policy_target_id'])
context['policy_target'] = policy_target
except Exception:
except AttributeError:
pass
except KeyError:
pass
return context
@ -83,7 +85,9 @@ class ExternalPTGDetailsView(tabs.TabbedTableView):
ext_policy_target = client.ext_policy_target_get(
self.request, context['ext_policy_target_id'])
context['policy_target'] = ext_policy_target
except Exception:
except AttributeError:
pass
except KeyError:
pass
return context

View File

@ -592,7 +592,8 @@ class LaunchInstance(workflows.Workflow):
try:
subnet = api.neutron.subnet_get(
request, subnet_id)
except Exception:
except Exception as e:
LOG.warning(str(e))
continue
if IPAddress(fixed_ip) in \
IPNetwork(subnet['cidr']):

View File

@ -26,7 +26,7 @@ from __future__ import print_function
import optparse
import os
import subprocess
import subprocess # nosec
import sys
@ -61,7 +61,7 @@ class InstallVenv(object):
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout) # nosec
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)