Replace assert to raise AssertionError

This patch is a fix for the issue B101 assert_used found by bandit
scanner.
As assert functionality could be turned off by the -O option for python
interpreter, assert statements were replaced to raise AssertionError.

The main reasoning for that is rest code after assert could be not
ready for values that assert filters.

Change-Id: I16d9c228f0f8aadae2044ce8fb95d7738970b0d5
This commit is contained in:
Ivan Kolodyazhny 2018-03-07 13:56:41 +02:00 committed by BubaVV
parent 30b731ac2e
commit 6bfa771054
2 changed files with 11 additions and 5 deletions

View File

@ -98,7 +98,8 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# the actual primary key, rather than assuming its id
LOG.warning('Id not in sort_keys; is sort_keys unique?')
assert(not (sort_dir and sort_dirs))
if sort_dir and sort_dirs:
raise AssertionError('Both sort_dir and sort_dirs specified.')
# Default the sort direction to ascending
if sort_dirs is None and sort_dir is None:
@ -108,7 +109,9 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
if sort_dirs is None:
sort_dirs = [sort_dir for _sort_key in sort_keys]
assert(len(sort_dirs) == len(sort_keys))
if len(sort_dirs) != len(sort_keys):
raise AssertionError(
'sort_dirs length is not equal to sort_keys length.')
# Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):

View File

@ -139,7 +139,8 @@ class RequestContextSerializer(messaging.Serializer):
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
@ -148,7 +149,8 @@ def get_client(target, version_cap=None, serializer=None):
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer)
access_policy = dispatcher.DefaultRPCAccessPolicy
return messaging.get_rpc_server(TRANSPORT,
@ -161,7 +163,8 @@ def get_server(target, endpoints, serializer=None):
@utils.if_notifications_enabled
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if NOTIFIER is None:
raise AssertionError('RPC Notifier is not initialized.')
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)