Address static analysis issues
This patch is meant to address issues found by running the bandit
static analysis tool. Some of the issues are valid vulnerabilities,
while others are false positives. For false positives, the 'nosec'
keyword has been added to allow bandit checks to pass.
Change-Id: Iaa3375f5031e7b86f3d0d54c27cf8f8fc30c90a4
(cherry picked from commit c386d4167c
)
This commit is contained in:
parent
8c956fe3d8
commit
1abd42d30a
@ -11,6 +11,8 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
|
from django.utils.html import format_html
|
||||||
|
from django.utils.html import format_html_join
|
||||||
from django.utils.safestring import mark_safe
|
from django.utils.safestring import mark_safe
|
||||||
|
|
||||||
from gbpui import client
|
from gbpui import client
|
||||||
@ -26,12 +28,12 @@ def update_pruleset_attributes(request, prset):
|
|||||||
rules = prset.policy_rules
|
rules = prset.policy_rules
|
||||||
url = "horizon:project:application_policy:policyruledetails"
|
url = "horizon:project:application_policy:policyruledetails"
|
||||||
value = ["<ul>"]
|
value = ["<ul>"]
|
||||||
li = lambda x: "<li><a href='" + \
|
|
||||||
reverse(url, kwargs={'policyrule_id': x.id}) + \
|
|
||||||
"'>" + x.name + "</a></li>"
|
|
||||||
for rule in rules:
|
for rule in rules:
|
||||||
r = client.policyrule_get(request, rule)
|
r = client.policyrule_get(request, rule)
|
||||||
value.append(li(r))
|
li = format_html("<li><a href='{}'>{}</a></li>",
|
||||||
|
reverse(url, kwargs={'policyrule_id': r.id}),
|
||||||
|
r.name)
|
||||||
|
value.append(li)
|
||||||
value.append("</ul>")
|
value.append("</ul>")
|
||||||
value = "".join(value)
|
value = "".join(value)
|
||||||
setattr(prset, 'policy_rules', mark_safe(value))
|
setattr(prset, 'policy_rules', mark_safe(value))
|
||||||
@ -44,10 +46,10 @@ def update_service_policy_attributes(policy):
|
|||||||
if len(np) > 0:
|
if len(np) > 0:
|
||||||
tags = []
|
tags = []
|
||||||
for item in np:
|
for item in np:
|
||||||
dl = ["<dl class='dl-horizontal'>"]
|
dl = [mark_safe("<dl class='dl-horizontal'>")]
|
||||||
dl.extend(["<dt>%s<dt><dd>%s</dd>" %
|
dl.extend(format_html_join('', "<dt>{}<dt><dd>{}</dd>",
|
||||||
(k, v) for k, v in list(item.items())])
|
((k, v) for k, v in list(item.items()))))
|
||||||
dl.append("</dl>")
|
dl.append(mark_safe("</dl>"))
|
||||||
tags.append("".join(dl))
|
tags.append("".join(dl))
|
||||||
params = mark_safe("".join(tags))
|
params = mark_safe("".join(tags))
|
||||||
setattr(policy, 'network_service_params', params)
|
setattr(policy, 'network_service_params', params)
|
||||||
@ -61,16 +63,15 @@ def update_policy_target_attributes(request, pt):
|
|||||||
provided = [client.policy_rule_set_get(request, item) for item in provided]
|
provided = [client.policy_rule_set_get(request, item) for item in provided]
|
||||||
consumed = [client.policy_rule_set_get(request, item) for item in consumed]
|
consumed = [client.policy_rule_set_get(request, item) for item in consumed]
|
||||||
p = ["<ul>"]
|
p = ["<ul>"]
|
||||||
li = lambda x: "<li><a href='" + \
|
li = lambda url, item: (format_html("<li><a href='{}'>{}</a></li>",
|
||||||
reverse(url, kwargs={'policy_rule_set_id': x.id}) + \
|
reverse(url, kwargs={'policy_rule_set_id': item.id}), item.name))
|
||||||
"'>" + x.name + "</a></li>"
|
|
||||||
for item in provided:
|
for item in provided:
|
||||||
p.append(li(item))
|
p.append(li(url, item))
|
||||||
p.append("</ul>")
|
p.append("</ul>")
|
||||||
p = "".join(p)
|
p = "".join(p)
|
||||||
c = ["<ul>"]
|
c = ["<ul>"]
|
||||||
for item in consumed:
|
for item in consumed:
|
||||||
c.append(li(item))
|
c.append(li(url, item))
|
||||||
c.append("</ul>")
|
c.append("</ul>")
|
||||||
c = "".join(c)
|
c = "".join(c)
|
||||||
consumed = [item.name for item in consumed]
|
consumed = [item.name for item in consumed]
|
||||||
@ -80,15 +81,14 @@ def update_policy_target_attributes(request, pt):
|
|||||||
if hasattr(pt, 'l2_policy_id') and pt.l2_policy_id is not None:
|
if hasattr(pt, 'l2_policy_id') and pt.l2_policy_id is not None:
|
||||||
policy = client.l2policy_get(request, pt.l2_policy_id)
|
policy = client.l2policy_get(request, pt.l2_policy_id)
|
||||||
u = reverse(l2url, kwargs={'l2policy_id': policy.id})
|
u = reverse(l2url, kwargs={'l2policy_id': policy.id})
|
||||||
atag = mark_safe(
|
atag = format_html("<a href='{}'>{}</a>", u, policy.name)
|
||||||
"<a href='" + u + "'>" + policy.name + "</a>")
|
|
||||||
setattr(pt, 'l2_policy_id', atag)
|
setattr(pt, 'l2_policy_id', atag)
|
||||||
if hasattr(pt, 'external_segments'):
|
if hasattr(pt, 'external_segments'):
|
||||||
exturl = "horizon:project:network_policy:external_connectivity_details"
|
exturl = "horizon:project:network_policy:external_connectivity_details"
|
||||||
value = ["<ul>"]
|
value = ["<ul>"]
|
||||||
li = lambda x: "<li><a href='" + \
|
li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
|
||||||
reverse(exturl, kwargs={'external_connectivity_id': x.id}) + \
|
reverse(exturl, kwargs={'external_connectivity_id': x.id}),
|
||||||
"'>" + x.name + "</a></li>"
|
x.name)
|
||||||
for external_segment in pt.external_segments:
|
for external_segment in pt.external_segments:
|
||||||
ext_policy = client.get_externalconnectivity(request,
|
ext_policy = client.get_externalconnectivity(request,
|
||||||
external_segment)
|
external_segment)
|
||||||
@ -104,17 +104,17 @@ def update_policyrule_attributes(request, prule):
|
|||||||
classifier_id = prule.policy_classifier_id
|
classifier_id = prule.policy_classifier_id
|
||||||
classifier = client.policyclassifier_get(request, classifier_id)
|
classifier = client.policyclassifier_get(request, classifier_id)
|
||||||
u = reverse(url, kwargs={'policyclassifier_id': classifier.id})
|
u = reverse(url, kwargs={'policyclassifier_id': classifier.id})
|
||||||
tag = mark_safe("<a href='" + u + "'>" + classifier.name + "</a>")
|
tag = format_html("<a href='{}'>{}</a>", u, classifier.name)
|
||||||
setattr(prule, 'policy_classifier_id', tag)
|
setattr(prule, 'policy_classifier_id', tag)
|
||||||
actions = prule.policy_actions
|
actions = prule.policy_actions
|
||||||
action_url = "horizon:project:application_policy:policyactiondetails"
|
action_url = "horizon:project:application_policy:policyactiondetails"
|
||||||
ul = ["<ul>"]
|
ul = [mark_safe("<ul>")]
|
||||||
for a in actions:
|
for a in actions:
|
||||||
action = client.policyaction_get(request, a)
|
action = client.policyaction_get(request, a)
|
||||||
u = reverse(action_url, kwargs={'policyaction_id': a})
|
u = reverse(action_url, kwargs={'policyaction_id': a})
|
||||||
li = "<li><a href='%s'>%s</a></li>" % (u, action.name)
|
li = format_html("<li><a href='%s'>%s</a></li>", u, action.name)
|
||||||
ul.append(li)
|
ul.append(li)
|
||||||
ul.append("</ul>")
|
ul.append(mark_safe("</ul>"))
|
||||||
ultag = "".join(ul)
|
ultag = "".join(ul)
|
||||||
setattr(prule, 'policy_actions', mark_safe(ultag))
|
setattr(prule, 'policy_actions', mark_safe(ultag))
|
||||||
return prule
|
return prule
|
||||||
@ -143,17 +143,15 @@ def update_classifier_attributes(classifiers):
|
|||||||
def update_l3_policy_attributes(request, l3_policy):
|
def update_l3_policy_attributes(request, l3_policy):
|
||||||
url = "horizon:project:network_policy:external_connectivity_details"
|
url = "horizon:project:network_policy:external_connectivity_details"
|
||||||
if bool(l3_policy.external_segments):
|
if bool(l3_policy.external_segments):
|
||||||
value = ["<ul>"]
|
value = [mark_safe("<ul>")]
|
||||||
li = \
|
li = lambda x: format_html("<li><a href='{}'>{}</a> : {}</li>",
|
||||||
lambda x: "<li><a href='" + \
|
reverse(url, kwargs={'external_connectivity_id': x.id}),
|
||||||
reverse(url, kwargs={'external_connectivity_id': x.id}) + \
|
x.name, l3_policy.external_segments[x.id][0])
|
||||||
"'>" + x.name + "</a>" + " : " + \
|
|
||||||
l3_policy.external_segments[x.id][0] + "</li>"
|
|
||||||
for ec in list(l3_policy.external_segments.keys()):
|
for ec in list(l3_policy.external_segments.keys()):
|
||||||
external_connectivity = client.get_externalconnectivity(request,
|
external_connectivity = client.get_externalconnectivity(request,
|
||||||
ec)
|
ec)
|
||||||
value.append(li(external_connectivity))
|
value.append(li(external_connectivity))
|
||||||
value.append("</ul>")
|
value.append(mark_safe("</ul>"))
|
||||||
tag = mark_safe("".join(value))
|
tag = mark_safe("".join(value))
|
||||||
else:
|
else:
|
||||||
tag = '-'
|
tag = '-'
|
||||||
@ -164,15 +162,13 @@ def update_l3_policy_attributes(request, l3_policy):
|
|||||||
def update_nat_pool_attributes(request, nat_pool):
|
def update_nat_pool_attributes(request, nat_pool):
|
||||||
url = "horizon:project:network_policy:external_connectivity_details"
|
url = "horizon:project:network_policy:external_connectivity_details"
|
||||||
id = nat_pool.external_segment_id
|
id = nat_pool.external_segment_id
|
||||||
value = ["<ul>"]
|
value = [mark_safe("<ul>")]
|
||||||
li = \
|
li = lambda x: format_html("<li><a href='{}'>{}</a></li>",
|
||||||
lambda x: "<li><a href='" + \
|
reverse(url, kwargs={'external_connectivity_id': x.id}), x.name)
|
||||||
reverse(url, kwargs={'external_connectivity_id': x.id}) + \
|
|
||||||
"'>" + x.name + "</a>" + "</li>"
|
|
||||||
external_connectivity = client.get_externalconnectivity(request,
|
external_connectivity = client.get_externalconnectivity(request,
|
||||||
id)
|
id)
|
||||||
value.append(li(external_connectivity))
|
value.append(li(external_connectivity))
|
||||||
value.append("</ul>")
|
value.append(mark_safe("</ul>"))
|
||||||
tag = mark_safe("".join(value))
|
tag = mark_safe("".join(value))
|
||||||
setattr(nat_pool, 'external_segment_id', tag)
|
setattr(nat_pool, 'external_segment_id', tag)
|
||||||
return nat_pool
|
return nat_pool
|
||||||
|
@ -88,11 +88,11 @@ class DropdownEditWidget(TextInput):
|
|||||||
def render(self, name, value, attrs=None):
|
def render(self, name, value, attrs=None):
|
||||||
text_html = super(DropdownEditWidget, self).render(
|
text_html = super(DropdownEditWidget, self).render(
|
||||||
name, value, attrs=attrs)
|
name, value, attrs=attrs)
|
||||||
data_list = '<datalist id="list__%s">' % self._name
|
data_list = [format_html('<datalist id="list__{}">', self._name)]
|
||||||
for item in self._list:
|
for item in self._list:
|
||||||
data_list += '<option value="%s">' % item
|
data_list.append(format_html('<option value="{}">', item))
|
||||||
data_list += '</datalist>'
|
data_list.append(mark_safe('</datalist>'))
|
||||||
return mark_safe(text_html + data_list)
|
return mark_safe(text_html + mark_safe("".join(data_list)))
|
||||||
|
|
||||||
|
|
||||||
class TransferTableWidget(widgets.SelectMultiple):
|
class TransferTableWidget(widgets.SelectMultiple):
|
||||||
@ -134,7 +134,7 @@ class TransferTableWidget(widgets.SelectMultiple):
|
|||||||
|
|
||||||
open_tag = format_html('<d-table {}>', flatatt(final_attrs))
|
open_tag = format_html('<d-table {}>', flatatt(final_attrs))
|
||||||
|
|
||||||
output = [open_tag, options, '</d-table>']
|
output = [open_tag, options, mark_safe('</d-table>')]
|
||||||
|
|
||||||
return mark_safe('\n'.join(output))
|
return mark_safe('\n'.join(output))
|
||||||
|
|
||||||
|
@ -167,7 +167,9 @@ class UpdatePolicyActionForm(BaseUpdateForm):
|
|||||||
self.fields['name'].initial = pa.name
|
self.fields['name'].initial = pa.name
|
||||||
self.fields['description'].initial = pa.description
|
self.fields['description'].initial = pa.description
|
||||||
self.fields['shared'].initial = pa.shared
|
self.fields['shared'].initial = pa.shared
|
||||||
except Exception:
|
except KeyError:
|
||||||
|
pass
|
||||||
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def handle(self, request, context):
|
def handle(self, request, context):
|
||||||
|
@ -460,7 +460,9 @@ class AddConsumedPRSForm(forms.SelfHandlingForm):
|
|||||||
policy_rule_sets = [
|
policy_rule_sets = [
|
||||||
(p.id, p.name) for p in items
|
(p.id, p.name) for p in items
|
||||||
if p.id not in consumedpolicy_rule_sets]
|
if p.id not in consumedpolicy_rule_sets]
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
self.fields['policy_rule_set'].choices = policy_rule_sets
|
self.fields['policy_rule_set'].choices = policy_rule_sets
|
||||||
|
|
||||||
@ -508,7 +510,9 @@ class ExtAddConsumedPRSForm(forms.SelfHandlingForm):
|
|||||||
policy_rule_sets = [
|
policy_rule_sets = [
|
||||||
(p.id, p.name) for p in items
|
(p.id, p.name) for p in items
|
||||||
if p.id not in consumedpolicy_rule_sets]
|
if p.id not in consumedpolicy_rule_sets]
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
self.fields['policy_rule_set'].choices = policy_rule_sets
|
self.fields['policy_rule_set'].choices = policy_rule_sets
|
||||||
|
|
||||||
@ -557,7 +561,9 @@ class RemoveConsumedPRSForm(forms.SelfHandlingForm):
|
|||||||
policy_rule_sets = [(p.id, p.name)
|
policy_rule_sets = [(p.id, p.name)
|
||||||
for p in items if p.id
|
for p in items if p.id
|
||||||
in consumedpolicy_rule_sets]
|
in consumedpolicy_rule_sets]
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
self.fields['policy_rule_set'].choices = policy_rule_sets
|
self.fields['policy_rule_set'].choices = policy_rule_sets
|
||||||
|
|
||||||
@ -605,7 +611,9 @@ class ExtRemoveConsumedPRSForm(forms.SelfHandlingForm):
|
|||||||
policy_rule_sets = [(p.id, p.name)
|
policy_rule_sets = [(p.id, p.name)
|
||||||
for p in items if p.id
|
for p in items if p.id
|
||||||
in consumedpolicy_rule_sets]
|
in consumedpolicy_rule_sets]
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
self.fields['policy_rule_set'].choices = policy_rule_sets
|
self.fields['policy_rule_set'].choices = policy_rule_sets
|
||||||
|
|
||||||
|
@ -66,7 +66,9 @@ class PTGDetailsView(tabs.TabbedTableView):
|
|||||||
policy_target = client.policy_target_get(
|
policy_target = client.policy_target_get(
|
||||||
self.request, context['policy_target_id'])
|
self.request, context['policy_target_id'])
|
||||||
context['policy_target'] = policy_target
|
context['policy_target'] = policy_target
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
return context
|
return context
|
||||||
|
|
||||||
@ -83,7 +85,9 @@ class ExternalPTGDetailsView(tabs.TabbedTableView):
|
|||||||
ext_policy_target = client.ext_policy_target_get(
|
ext_policy_target = client.ext_policy_target_get(
|
||||||
self.request, context['ext_policy_target_id'])
|
self.request, context['ext_policy_target_id'])
|
||||||
context['policy_target'] = ext_policy_target
|
context['policy_target'] = ext_policy_target
|
||||||
except Exception:
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
return context
|
return context
|
||||||
|
|
||||||
|
@ -592,7 +592,8 @@ class LaunchInstance(workflows.Workflow):
|
|||||||
try:
|
try:
|
||||||
subnet = api.neutron.subnet_get(
|
subnet = api.neutron.subnet_get(
|
||||||
request, subnet_id)
|
request, subnet_id)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
LOG.warning(str(e))
|
||||||
continue
|
continue
|
||||||
if IPAddress(fixed_ip) in \
|
if IPAddress(fixed_ip) in \
|
||||||
IPNetwork(subnet['cidr']):
|
IPNetwork(subnet['cidr']):
|
||||||
|
@ -26,7 +26,7 @@ from __future__ import print_function
|
|||||||
|
|
||||||
import optparse
|
import optparse
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess # nosec
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
|
||||||
@ -61,7 +61,7 @@ class InstallVenv(object):
|
|||||||
else:
|
else:
|
||||||
stdout = None
|
stdout = None
|
||||||
|
|
||||||
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
|
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout) # nosec
|
||||||
output = proc.communicate()[0]
|
output = proc.communicate()[0]
|
||||||
if check_exit_code and proc.returncode != 0:
|
if check_exit_code and proc.returncode != 0:
|
||||||
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
|
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
|
||||||
|
Loading…
Reference in New Issue
Block a user