Enable try_except_pass Bandit test
The try_except_pass test checks that pass isn't used in an except block because it's the source of lots of security issues. The current instances of pass in an except block are marked as nosec. Change-Id: I73af6b80fa75286e96943026b7b71ad23dc1786b
This commit is contained in:
parent
61397486a3
commit
068cba0047
@ -81,9 +81,7 @@ profiles:
|
||||
- ssl_with_bad_defaults
|
||||
- ssl_with_bad_version
|
||||
- ssl_with_no_version
|
||||
|
||||
# TODO:
|
||||
# - try_except_pass
|
||||
- try_except_pass
|
||||
|
||||
# Keystone has no use for mako.
|
||||
# - use_of_mako_templates
|
||||
|
@ -125,8 +125,8 @@ class Assignment(keystone_assignment.AssignmentDriverV8):
|
||||
target_id=project_id or domain_id,
|
||||
role_id=role_id,
|
||||
inherited=inherited_to_projects))
|
||||
except sql.DBDuplicateEntry:
|
||||
# The v3 grant APIs are silent if the assignment already exists
|
||||
except sql.DBDuplicateEntry: # nosec : The v3 grant APIs are silent if
|
||||
# the assignment already exists
|
||||
pass
|
||||
|
||||
def list_grant_role_ids(self, user_id=None, group_id=None,
|
||||
|
@ -111,7 +111,8 @@ class Manager(manager.Manager):
|
||||
tenant_id=project_ref['id'])
|
||||
role_list = self._roles_from_role_dicts(
|
||||
metadata_ref.get('roles', {}), False)
|
||||
except exception.MetadataNotFound:
|
||||
except exception.MetadataNotFound: # nosec: No metadata so no
|
||||
# roles.
|
||||
pass
|
||||
|
||||
if CONF.os_inherit.enabled:
|
||||
@ -121,7 +122,10 @@ class Manager(manager.Manager):
|
||||
user_id=user_id, domain_id=project_ref['domain_id'])
|
||||
role_list += self._roles_from_role_dicts(
|
||||
metadata_ref.get('roles', {}), True)
|
||||
except (exception.MetadataNotFound, exception.NotImplemented):
|
||||
except (exception.MetadataNotFound, # nosec : No metadata or
|
||||
# the backend doesn't support the role ops, so no
|
||||
# roles.
|
||||
exception.NotImplemented):
|
||||
pass
|
||||
# As well inherited roles from parent projects
|
||||
for p in self.resource_api.list_project_parents(
|
||||
@ -157,7 +161,8 @@ class Manager(manager.Manager):
|
||||
domain_id=domain_id)
|
||||
role_list += self._roles_from_role_dicts(
|
||||
metadata_ref.get('roles', {}), False)
|
||||
except (exception.MetadataNotFound, exception.NotImplemented):
|
||||
except (exception.MetadataNotFound, # nosec
|
||||
exception.NotImplemented):
|
||||
# MetadataNotFound implies no group grant, so skip.
|
||||
# Ignore NotImplemented since not all backends support
|
||||
# domains.
|
||||
@ -169,7 +174,8 @@ class Manager(manager.Manager):
|
||||
try:
|
||||
metadata_ref = self._get_metadata(user_id=user_id,
|
||||
domain_id=domain_id)
|
||||
except (exception.MetadataNotFound, exception.NotImplemented):
|
||||
except (exception.MetadataNotFound, # nosec
|
||||
exception.NotImplemented):
|
||||
# MetadataNotFound implies no user grants.
|
||||
# Ignore NotImplemented since not all backends support
|
||||
# domains
|
||||
@ -1208,7 +1214,7 @@ class RoleManager(manager.Manager):
|
||||
def delete_role(self, role_id, initiator=None):
|
||||
try:
|
||||
self.assignment_api.delete_tokens_for_role_assignments(role_id)
|
||||
except exception.NotImplemented:
|
||||
except exception.NotImplemented: # nosec
|
||||
# FIXME(morganfainberg): Not all backends (ldap) implement
|
||||
# `list_role_assignments_for_role` which would have previously
|
||||
# caused a NotImplmented error to be raised when called through
|
||||
|
@ -55,7 +55,14 @@ class Role(assignment.RoleDriverV8):
|
||||
self.role.check_allow_create()
|
||||
try:
|
||||
self.get_role(role_id)
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# The call to self.get_role() raises this exception when a role
|
||||
# with the given ID doesn't exist. This was done to ensure that
|
||||
# a role with the new role's ID doesn't already exist. As such this
|
||||
# exception is expected to happen in the normal case. The abnormal
|
||||
# case would be if the role does already exist. So this exception
|
||||
# is expected to be ignored and there's no security issue with
|
||||
# ignoring it.
|
||||
pass
|
||||
else:
|
||||
msg = _('Duplicate ID, %s.') % role_id
|
||||
@ -63,7 +70,14 @@ class Role(assignment.RoleDriverV8):
|
||||
|
||||
try:
|
||||
self.role.get_by_name(role['name'])
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# The call to self.role.get_by_name() raises this exception when a
|
||||
# role with the given name doesn't exist. This was done to ensure
|
||||
# that a role with the new role's name doesn't already exist. As
|
||||
# such this exception is expected to happen in the normal case. The
|
||||
# abnormal case would be if a role with the same name does already
|
||||
# exist. So this exception is expected to be ignored and there's no
|
||||
# security issue with ignoring it.
|
||||
pass
|
||||
else:
|
||||
msg = _('Duplicate name, %s.') % role['name']
|
||||
@ -117,7 +131,8 @@ class RoleApi(RoleLdapStructureMixin, common_ldap.BaseLdap):
|
||||
if old_role['id'] != role_id:
|
||||
raise exception.Conflict(
|
||||
_('Cannot duplicate name %s') % old_role)
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# Another role with the same name doesn't exist, good.
|
||||
pass
|
||||
return super(RoleApi, self).update(role_id, role)
|
||||
|
||||
|
@ -580,7 +580,7 @@ class Auth(controller.V3Controller):
|
||||
if user_id:
|
||||
try:
|
||||
user_refs = self.assignment_api.list_projects_for_user(user_id)
|
||||
except exception.UserNotFound:
|
||||
except exception.UserNotFound: # nosec
|
||||
# federated users have an id but they don't link to anything
|
||||
pass
|
||||
|
||||
@ -601,7 +601,7 @@ class Auth(controller.V3Controller):
|
||||
if user_id:
|
||||
try:
|
||||
user_refs = self.assignment_api.list_domains_for_user(user_id)
|
||||
except exception.UserNotFound:
|
||||
except exception.UserNotFound: # nosec
|
||||
# federated users have an id but they don't link to anything
|
||||
pass
|
||||
|
||||
|
@ -129,7 +129,8 @@ class Manager(manager.Manager):
|
||||
# Check duplicate ID
|
||||
try:
|
||||
self.get_region(region_ref['id'])
|
||||
except exception.RegionNotFound:
|
||||
except exception.RegionNotFound: # nosec
|
||||
# A region with the same id doesn't exist already, good.
|
||||
pass
|
||||
else:
|
||||
msg = _('Duplicate ID, %s.') % region_ref['id']
|
||||
|
@ -428,7 +428,7 @@ class DomainConfigUploadFiles(object):
|
||||
"""
|
||||
try:
|
||||
self.upload_config_to_database(file_name, domain_name)
|
||||
except ValueError:
|
||||
except ValueError: # nosec
|
||||
# We've already given all the info we can in a message, so carry
|
||||
# on to the next one
|
||||
pass
|
||||
|
@ -169,9 +169,11 @@ class Server(service.ServiceBase):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
except KeyboardInterrupt: # nosec
|
||||
# If CTRL-C, just break out of the loop.
|
||||
pass
|
||||
except greenlet.GreenletExit:
|
||||
except greenlet.GreenletExit: # nosec
|
||||
# If exiting, break out of the loop.
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
@ -199,7 +201,7 @@ class Server(service.ServiceBase):
|
||||
socket, application, log=EventletFilteringLogger(logger),
|
||||
debug=False, keepalive=CONF.eventlet_server.wsgi_keep_alive,
|
||||
socket_timeout=socket_timeout)
|
||||
except greenlet.GreenletExit:
|
||||
except greenlet.GreenletExit: # nosec
|
||||
# Wait until all servers have completed running
|
||||
pass
|
||||
except Exception:
|
||||
|
@ -113,11 +113,13 @@ def enabled2py(val):
|
||||
|
||||
try:
|
||||
return LDAP_VALUES[val]
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# It wasn't a boolean value, will try as an int instead.
|
||||
pass
|
||||
try:
|
||||
return int(val)
|
||||
except ValueError:
|
||||
except ValueError: # nosec
|
||||
# It wasn't an int either, will try as utf8 instead.
|
||||
pass
|
||||
return utf8_decode(val)
|
||||
|
||||
@ -1354,7 +1356,8 @@ class BaseLdap(object):
|
||||
continue
|
||||
|
||||
v = lower_res[map_attr.lower()]
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Didn't find the attr, so don't add it.
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
@ -1383,7 +1386,8 @@ class BaseLdap(object):
|
||||
if values.get('name') is not None:
|
||||
try:
|
||||
self.get_by_name(values['name'])
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# Didn't find it so it's unique, good.
|
||||
pass
|
||||
else:
|
||||
raise exception.Conflict(type=self.options_name,
|
||||
@ -1393,7 +1397,8 @@ class BaseLdap(object):
|
||||
if values.get('id') is not None:
|
||||
try:
|
||||
self.get(values['id'])
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# Didn't find it, so it's unique, good.
|
||||
pass
|
||||
else:
|
||||
raise exception.Conflict(type=self.options_name,
|
||||
@ -1840,7 +1845,8 @@ class EnabledEmuMixIn(BaseLdap):
|
||||
with self.get_connection() as conn:
|
||||
try:
|
||||
conn.modify_s(self.enabled_emulation_dn, modlist)
|
||||
except (ldap.NO_SUCH_OBJECT, ldap.NO_SUCH_ATTRIBUTE):
|
||||
except (ldap.NO_SUCH_OBJECT, ldap.NO_SUCH_ATTRIBUTE): # nosec
|
||||
# It's already gone, good.
|
||||
pass
|
||||
|
||||
def create(self, values):
|
||||
|
@ -154,7 +154,7 @@ def _assert_not_schema_downgrade(extension=None, version=None):
|
||||
current_ver = int(six.text_type(get_db_version(extension)))
|
||||
if int(version) < current_ver:
|
||||
raise migration.exception.DbMigrationError()
|
||||
except exceptions.DatabaseNotControlledError:
|
||||
except exceptions.DatabaseNotControlledError: # nosec
|
||||
# NOTE(morganfainberg): The database is not controlled, this action
|
||||
# cannot be a downgrade.
|
||||
pass
|
||||
@ -177,7 +177,7 @@ def _sync_extension_repo(extension, version):
|
||||
# Register the repo with the version control API
|
||||
# If it already knows about the repo, it will throw
|
||||
# an exception that we can safely ignore
|
||||
except exceptions.DatabaseAlreadyControlledError:
|
||||
except exceptions.DatabaseAlreadyControlledError: # nosec
|
||||
pass
|
||||
except exception.MigrationNotProvided as e:
|
||||
print(e)
|
||||
|
@ -448,7 +448,8 @@ def _sign_assertion(assertion):
|
||||
try:
|
||||
if file_path:
|
||||
os.remove(file_path)
|
||||
except OSError:
|
||||
except OSError: # nosec
|
||||
# The file is already gone, good.
|
||||
pass
|
||||
|
||||
return saml2.create_class_from_xml_string(saml.Assertion, stdout)
|
||||
|
@ -203,7 +203,8 @@ def get_remote_id_parameter(protocol):
|
||||
group=protocol)
|
||||
try:
|
||||
remote_id_parameter = CONF[protocol]['remote_id_attribute']
|
||||
except AttributeError:
|
||||
except AttributeError: # nosec
|
||||
# No remote ID attr, will be logged and use the default instead.
|
||||
pass
|
||||
if not remote_id_parameter:
|
||||
LOG.debug('Cannot find "remote_id_attribute" in configuration '
|
||||
|
@ -217,7 +217,8 @@ class Manager(manager.Manager):
|
||||
service_id=endpoint['service_id'],
|
||||
region_id=region_id)
|
||||
return ref['policy_id']
|
||||
except exception.PolicyAssociationNotFound:
|
||||
except exception.PolicyAssociationNotFound: # nosec
|
||||
# There wasn't one for that region & service, handle below.
|
||||
pass
|
||||
|
||||
# There wasn't one for that region & service, let's
|
||||
@ -239,7 +240,9 @@ class Manager(manager.Manager):
|
||||
try:
|
||||
ref = self.driver.get_policy_association(endpoint_id=endpoint_id)
|
||||
return _get_policy(ref['policy_id'], endpoint_id)
|
||||
except exception.PolicyAssociationNotFound:
|
||||
except exception.PolicyAssociationNotFound: # nosec
|
||||
# There wasn't a policy explicitly defined for this endpoint,
|
||||
# handled below.
|
||||
pass
|
||||
|
||||
# There wasn't a policy explicitly defined for this endpoint, so
|
||||
@ -255,7 +258,8 @@ class Manager(manager.Manager):
|
||||
ref = self.driver.get_policy_association(
|
||||
service_id=endpoint['service_id'])
|
||||
return _get_policy(ref['policy_id'], endpoint_id)
|
||||
except exception.PolicyAssociationNotFound:
|
||||
except exception.PolicyAssociationNotFound: # nosec
|
||||
# No policy is associated with endpoint, handled below.
|
||||
pass
|
||||
|
||||
msg = _('No policy is associated with endpoint '
|
||||
|
@ -149,7 +149,7 @@ class User(controller.V2Controller):
|
||||
try:
|
||||
self.assignment_api.add_user_to_project(
|
||||
user_ref['tenantId'], user_id)
|
||||
except exception.Conflict:
|
||||
except exception.Conflict: # nosec
|
||||
# We are already a member of that tenant
|
||||
pass
|
||||
except exception.NotFound:
|
||||
|
@ -70,7 +70,8 @@ def filter_user(user_ref):
|
||||
try:
|
||||
user_ref['extra'].pop('password', None)
|
||||
user_ref['extra'].pop('tenants', None)
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# ok to not have extra in the user_ref.
|
||||
pass
|
||||
return user_ref
|
||||
|
||||
@ -404,7 +405,7 @@ class DomainConfigs(dict):
|
||||
# specific driver for this domain.
|
||||
try:
|
||||
del self[domain_id]
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Allow this error in case we are unlucky and in a
|
||||
# multi-threaded situation, two threads happen to be running
|
||||
# in lock step.
|
||||
|
@ -78,7 +78,7 @@ class Mapping(identity.MappingDriverV8):
|
||||
try:
|
||||
session.query(IDMapping).filter(
|
||||
IDMapping.public_id == public_id).delete()
|
||||
except sql.NotFound:
|
||||
except sql.NotFound: # nosec
|
||||
# NOTE(morganfainberg): There is nothing to delete and nothing
|
||||
# to do.
|
||||
pass
|
||||
|
@ -116,7 +116,7 @@ class KeystoneToken(dict):
|
||||
return self['user']['domain']['name']
|
||||
elif 'user' in self:
|
||||
return "Default"
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Do not raise KeyError, raise UnexpectedError
|
||||
pass
|
||||
raise exception.UnexpectedError()
|
||||
@ -128,7 +128,7 @@ class KeystoneToken(dict):
|
||||
return self['user']['domain']['id']
|
||||
elif 'user' in self:
|
||||
return CONF.identity.default_domain_id
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Do not raise KeyError, raise UnexpectedError
|
||||
pass
|
||||
raise exception.UnexpectedError()
|
||||
@ -184,7 +184,7 @@ class KeystoneToken(dict):
|
||||
return self['project']['domain']['id']
|
||||
elif 'tenant' in self['token']:
|
||||
return CONF.identity.default_domain_id
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Do not raise KeyError, raise UnexpectedError
|
||||
pass
|
||||
|
||||
@ -197,7 +197,7 @@ class KeystoneToken(dict):
|
||||
return self['project']['domain']['name']
|
||||
if 'tenant' in self['token']:
|
||||
return 'Default'
|
||||
except KeyError:
|
||||
except KeyError: # nosec
|
||||
# Do not raise KeyError, raise UnexpectedError
|
||||
pass
|
||||
|
||||
|
@ -130,7 +130,8 @@ class DomainConfig(resource.DomainConfigDriverV8):
|
||||
ref = ConfigRegister(type=type, domain_id=domain_id)
|
||||
session.add(ref)
|
||||
return True
|
||||
except sql.DBDuplicateEntry:
|
||||
except sql.DBDuplicateEntry: # nosec
|
||||
# Continue on and return False to indicate failure.
|
||||
pass
|
||||
return False
|
||||
|
||||
|
@ -317,7 +317,8 @@ class TokenDriverV8(object):
|
||||
for token in token_list:
|
||||
try:
|
||||
self.delete_token(token)
|
||||
except exception.NotFound:
|
||||
except exception.NotFound: # nosec
|
||||
# The token is already gone, good.
|
||||
pass
|
||||
return token_list
|
||||
|
||||
|
@ -176,7 +176,7 @@ def rotate_keys(keystone_user_id=None, keystone_group_id=None):
|
||||
if os.path.isfile(path):
|
||||
try:
|
||||
key_id = int(filename)
|
||||
except ValueError:
|
||||
except ValueError: # nosec : name isn't a number, ignore the file.
|
||||
pass
|
||||
else:
|
||||
key_files[key_id] = path
|
||||
@ -243,7 +243,8 @@ def load_keys():
|
||||
with open(path, 'r') as key_file:
|
||||
try:
|
||||
key_id = int(filename)
|
||||
except ValueError:
|
||||
except ValueError: # nosec : filename isn't a number, ignore
|
||||
# this file since it's not a key.
|
||||
pass
|
||||
else:
|
||||
keys[key_id] = key_file.read()
|
||||
|
@ -192,7 +192,7 @@ class Manager(manager.Manager):
|
||||
# recursive call to make sure all notifications are sent
|
||||
try:
|
||||
self.delete_trust(t['id'])
|
||||
except exception.TrustNotFound:
|
||||
except exception.TrustNotFound: # nosec
|
||||
# if trust was deleted by concurrent process
|
||||
# consistency must not suffer
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user