Address static analysis issues

This patch is meant to address issues found by running the bandit
static analysis tool. Some of the issues are valid vulnerabilities,
while others are false positives. For false positives, the 'nosec'
keyword has been added to allow bandit checks to pass.

Change-Id: Iaa3375f5031e7b86f3d0d54c27cf8f8fc30c90a4
This commit is contained in:
Thomas Bachman 2024-05-23 12:13:06 +00:00
parent 56e6784a85
commit c386d4167c
7 changed files with 61 additions and 50 deletions

View File

@ -11,6 +11,8 @@
# under the License. # under the License.
from django.urls import reverse from django.urls import reverse
from django.utils.html import format_html
from django.utils.html import format_html_join
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from gbpui import client from gbpui import client
@ -26,12 +28,12 @@ def update_pruleset_attributes(request, prset):
rules = prset.policy_rules rules = prset.policy_rules
url = "horizon:project:application_policy:policyruledetails" url = "horizon:project:application_policy:policyruledetails"
value = ["<ul>"] value = ["<ul>"]
li = lambda x: "<li><a href='" + \
reverse(url, kwargs={'policyrule_id': x.id}) + \
"'>" + x.name + "</a></li>"
for rule in rules: for rule in rules:
r = client.policyrule_get(request, rule) r = client.policyrule_get(request, rule)
value.append(li(r)) li = format_html("<li><a href='{}'>{}</a></li>",
reverse(url, kwargs={'policyrule_id': r.id}),
r.name)
value.append(li)
value.append("</ul>") value.append("</ul>")
value = "".join(value) value = "".join(value)
setattr(prset, 'policy_rules', mark_safe(value)) setattr(prset, 'policy_rules', mark_safe(value))
@ -44,10 +46,10 @@ def update_service_policy_attributes(policy):
if len(np) > 0: if len(np) > 0:
tags = [] tags = []
for item in np: for item in np:
dl = ["<dl class='dl-horizontal'>"] dl = [mark_safe("<dl class='dl-horizontal'>")]
dl.extend(["<dt>%s<dt><dd>%s</dd>" % dl.extend(format_html_join('', "<dt>{}<dt><dd>{}</dd>",
(k, v) for k, v in list(item.items())]) ((k, v) for k, v in list(item.items()))))
dl.append("</dl>") dl.append(mark_safe("</dl>"))
tags.append("".join(dl)) tags.append("".join(dl))
params = mark_safe("".join(tags)) params = mark_safe("".join(tags))
setattr(policy, 'network_service_params', params) setattr(policy, 'network_service_params', params)
@ -61,16 +63,15 @@ def update_policy_target_attributes(request, pt):
provided = [client.policy_rule_set_get(request, item) for item in provided] provided = [client.policy_rule_set_get(request, item) for item in provided]
consumed = [client.policy_rule_set_get(request, item) for item in consumed] consumed = [client.policy_rule_set_get(request, item) for item in consumed]
p = ["<ul>"] p = ["<ul>"]
li = lambda x: "<li><a href='" + \ li = lambda url, item: (format_html("<li><a href='{}'>{}</a></li>",
reverse(url, kwargs={'policy_rule_set_id': x.id}) + \ reverse(url, kwargs={'policy_rule_set_id': item.id}), item.name))
"'>" + x.name + "</a></li>"
for item in provided: for item in provided:
p.append(li(item)) p.append(li(url, item))
p.append("</ul>") p.append("</ul>")
p = "".join(p) p = "".join(p)
c = ["<ul>"] c = ["<ul>"]
for item in consumed: for item in consumed:
c.append(li(item)) c.append(li(url, item))
c.append("</ul>") c.append("</ul>")
c = "".join(c) c = "".join(c)
consumed = [item.name for item in consumed] consumed = [item.name for item in consumed]
@ -80,15 +81,14 @@ def update_policy_target_attributes(request, pt):
if hasattr(pt, 'l2_policy_id') and pt.l2_policy_id is not None: if hasattr(pt, 'l2_policy_id') and pt.l2_policy_id is not None:
policy = client.l2policy_get(request, pt.l2_policy_id) policy = client.l2policy_get(request, pt.l2_policy_id)
u = reverse(l2url, kwargs={'l2policy_id': policy.id}) u = reverse(l2url, kwargs={'l2policy_id': policy.id})
atag = mark_safe( atag = format_html("<a href='{}'>{}</a>", u, policy.name)
"<a href='" + u + "'>" + policy.name + "</a>")
setattr(pt, 'l2_policy_id', atag) setattr(pt, 'l2_policy_id', atag)
if hasattr(pt, 'external_segments'): if hasattr(pt, 'external_segments'):
exturl = "horizon:project:network_policy:external_connectivity_details" exturl = "horizon:project:network_policy:external_connectivity_details"
value = ["<ul>"] value = ["<ul>"]
li = lambda x: "<li><a href='" + \ li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
reverse(exturl, kwargs={'external_connectivity_id': x.id}) + \ reverse(exturl, kwargs={'external_connectivity_id': x.id}),
"'>" + x.name + "</a></li>" x.name)
for external_segment in pt.external_segments: for external_segment in pt.external_segments:
ext_policy = client.get_externalconnectivity(request, ext_policy = client.get_externalconnectivity(request,
external_segment) external_segment)
@ -104,17 +104,17 @@ def update_policyrule_attributes(request, prule):
classifier_id = prule.policy_classifier_id classifier_id = prule.policy_classifier_id
classifier = client.policyclassifier_get(request, classifier_id) classifier = client.policyclassifier_get(request, classifier_id)
u = reverse(url, kwargs={'policyclassifier_id': classifier.id}) u = reverse(url, kwargs={'policyclassifier_id': classifier.id})
tag = mark_safe("<a href='" + u + "'>" + classifier.name + "</a>") tag = format_html("<a href='{}'>{}</a>", u, classifier.name)
setattr(prule, 'policy_classifier_id', tag) setattr(prule, 'policy_classifier_id', tag)
actions = prule.policy_actions actions = prule.policy_actions
action_url = "horizon:project:application_policy:policyactiondetails" action_url = "horizon:project:application_policy:policyactiondetails"
ul = ["<ul>"] ul = [mark_safe("<ul>")]
for a in actions: for a in actions:
action = client.policyaction_get(request, a) action = client.policyaction_get(request, a)
u = reverse(action_url, kwargs={'policyaction_id': a}) u = reverse(action_url, kwargs={'policyaction_id': a})
li = "<li><a href='%s'>%s</a></li>" % (u, action.name) li = format_html("<li><a href='%s'>%s</a></li>", u, action.name)
ul.append(li) ul.append(li)
ul.append("</ul>") ul.append(mark_safe("</ul>"))
ultag = "".join(ul) ultag = "".join(ul)
setattr(prule, 'policy_actions', mark_safe(ultag)) setattr(prule, 'policy_actions', mark_safe(ultag))
return prule return prule
@ -143,17 +143,15 @@ def update_classifier_attributes(classifiers):
def update_l3_policy_attributes(request, l3_policy): def update_l3_policy_attributes(request, l3_policy):
url = "horizon:project:network_policy:external_connectivity_details" url = "horizon:project:network_policy:external_connectivity_details"
if bool(l3_policy.external_segments): if bool(l3_policy.external_segments):
value = ["<ul>"] value = [mark_safe("<ul>")]
li = \ li = lambda x: format_html("<li><a href='{}'>{}</a> : {}</li>",
lambda x: "<li><a href='" + \ reverse(url, kwargs={'external_connectivity_id': x.id}),
reverse(url, kwargs={'external_connectivity_id': x.id}) + \ x.name, l3_policy.external_segments[x.id][0])
"'>" + x.name + "</a>" + " : " + \
l3_policy.external_segments[x.id][0] + "</li>"
for ec in list(l3_policy.external_segments.keys()): for ec in list(l3_policy.external_segments.keys()):
external_connectivity = client.get_externalconnectivity(request, external_connectivity = client.get_externalconnectivity(request,
ec) ec)
value.append(li(external_connectivity)) value.append(li(external_connectivity))
value.append("</ul>") value.append(mark_safe("</ul>"))
tag = mark_safe("".join(value)) tag = mark_safe("".join(value))
else: else:
tag = '-' tag = '-'
@ -164,15 +162,13 @@ def update_l3_policy_attributes(request, l3_policy):
def update_nat_pool_attributes(request, nat_pool): def update_nat_pool_attributes(request, nat_pool):
url = "horizon:project:network_policy:external_connectivity_details" url = "horizon:project:network_policy:external_connectivity_details"
id = nat_pool.external_segment_id id = nat_pool.external_segment_id
value = ["<ul>"] value = [mark_safe("<ul>")]
li = \ li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
lambda x: "<li><a href='" + \ reverse(url, kwargs={'external_connectivity_id': x.id}), x.name)
reverse(url, kwargs={'external_connectivity_id': x.id}) + \
"'>" + x.name + "</a>" + "</li>"
external_connectivity = client.get_externalconnectivity(request, external_connectivity = client.get_externalconnectivity(request,
id) id)
value.append(li(external_connectivity)) value.append(li(external_connectivity))
value.append("</ul>") value.append(mark_safe("</ul>"))
tag = mark_safe("".join(value)) tag = mark_safe("".join(value))
setattr(nat_pool, 'external_segment_id', tag) setattr(nat_pool, 'external_segment_id', tag)
return nat_pool return nat_pool

View File

@ -88,11 +88,11 @@ class DropdownEditWidget(TextInput):
def render(self, name, value, attrs=None): def render(self, name, value, attrs=None):
text_html = super(DropdownEditWidget, self).render( text_html = super(DropdownEditWidget, self).render(
name, value, attrs=attrs) name, value, attrs=attrs)
data_list = '<datalist id="list__%s">' % self._name data_list = [format_html('<datalist id="list__{}">', self._name)]
for item in self._list: for item in self._list:
data_list += '<option value="%s">' % item data_list.append(format_html('<option value="{}">', item))
data_list += '</datalist>' data_list.append(mark_safe('</datalist>'))
return mark_safe(text_html + data_list) return mark_safe(text_html + mark_safe("".join(data_list)))
class TransferTableWidget(widgets.SelectMultiple): class TransferTableWidget(widgets.SelectMultiple):
@ -134,7 +134,7 @@ class TransferTableWidget(widgets.SelectMultiple):
open_tag = format_html('<d-table {}>', flatatt(final_attrs)) open_tag = format_html('<d-table {}>', flatatt(final_attrs))
output = [open_tag, options, '</d-table>'] output = [open_tag, options, mark_safe('</d-table>')]
return mark_safe('\n'.join(output)) return mark_safe('\n'.join(output))

View File

@ -167,7 +167,9 @@ class UpdatePolicyActionForm(BaseUpdateForm):
self.fields['name'].initial = pa.name self.fields['name'].initial = pa.name
self.fields['description'].initial = pa.description self.fields['description'].initial = pa.description
self.fields['shared'].initial = pa.shared self.fields['shared'].initial = pa.shared
except Exception: except KeyError:
pass
except AttributeError:
pass pass
def handle(self, request, context): def handle(self, request, context):

View File

@ -460,7 +460,9 @@ class AddConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [ policy_rule_sets = [
(p.id, p.name) for p in items (p.id, p.name) for p in items
if p.id not in consumedpolicy_rule_sets] if p.id not in consumedpolicy_rule_sets]
except Exception: except AttributeError:
pass
except KeyError:
pass pass
self.fields['policy_rule_set'].choices = policy_rule_sets self.fields['policy_rule_set'].choices = policy_rule_sets
@ -508,7 +510,9 @@ class ExtAddConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [ policy_rule_sets = [
(p.id, p.name) for p in items (p.id, p.name) for p in items
if p.id not in consumedpolicy_rule_sets] if p.id not in consumedpolicy_rule_sets]
except Exception: except AttributeError:
pass
except KeyError:
pass pass
self.fields['policy_rule_set'].choices = policy_rule_sets self.fields['policy_rule_set'].choices = policy_rule_sets
@ -557,7 +561,9 @@ class RemoveConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [(p.id, p.name) policy_rule_sets = [(p.id, p.name)
for p in items if p.id for p in items if p.id
in consumedpolicy_rule_sets] in consumedpolicy_rule_sets]
except Exception: except AttributeError:
pass
except KeyError:
pass pass
self.fields['policy_rule_set'].choices = policy_rule_sets self.fields['policy_rule_set'].choices = policy_rule_sets
@ -605,7 +611,9 @@ class ExtRemoveConsumedPRSForm(forms.SelfHandlingForm):
policy_rule_sets = [(p.id, p.name) policy_rule_sets = [(p.id, p.name)
for p in items if p.id for p in items if p.id
in consumedpolicy_rule_sets] in consumedpolicy_rule_sets]
except Exception: except AttributeError:
pass
except KeyError:
pass pass
self.fields['policy_rule_set'].choices = policy_rule_sets self.fields['policy_rule_set'].choices = policy_rule_sets

View File

@ -66,7 +66,9 @@ class PTGDetailsView(tabs.TabbedTableView):
policy_target = client.policy_target_get( policy_target = client.policy_target_get(
self.request, context['policy_target_id']) self.request, context['policy_target_id'])
context['policy_target'] = policy_target context['policy_target'] = policy_target
except Exception: except AttributeError:
pass
except KeyError:
pass pass
return context return context
@ -83,7 +85,9 @@ class ExternalPTGDetailsView(tabs.TabbedTableView):
ext_policy_target = client.ext_policy_target_get( ext_policy_target = client.ext_policy_target_get(
self.request, context['ext_policy_target_id']) self.request, context['ext_policy_target_id'])
context['policy_target'] = ext_policy_target context['policy_target'] = ext_policy_target
except Exception: except AttributeError:
pass
except KeyError:
pass pass
return context return context

View File

@ -592,7 +592,8 @@ class LaunchInstance(workflows.Workflow):
try: try:
subnet = api.neutron.subnet_get( subnet = api.neutron.subnet_get(
request, subnet_id) request, subnet_id)
except Exception: except Exception as e:
LOG.warning(str(e))
continue continue
if IPAddress(fixed_ip) in \ if IPAddress(fixed_ip) in \
IPNetwork(subnet['cidr']): IPNetwork(subnet['cidr']):

View File

@ -26,7 +26,7 @@ from __future__ import print_function
import optparse import optparse
import os import os
import subprocess import subprocess # nosec
import sys import sys
@ -61,7 +61,7 @@ class InstallVenv(object):
else: else:
stdout = None stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout) proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout) # nosec
output = proc.communicate()[0] output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0: if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output) self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)