pyupgrade changes for Python3.9+

As discussed at the Epoxy PTG meeting, run an automated
upgrade tool to make code python 3.9+ compliant.

Result of running:

$ pyupgrade --py39-plus $(git ls-files | grep ".py$")

Fixed PEP8 errors introduced by pyupgrade by running:

$ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
  --in-place neutron_lib

Also did manual updates as necessary to fix other errors
and warnings after above commands. Removed a number of
pylint 'disable' directives.

Inspired by Octavia and Nova [0].

[0] https://review.opendev.org/c/openstack/nova/+/896986

Change-Id: Ic8c95898fc00d5463aa0992e844eee60ac3dc5a0
This commit is contained in:
Brian Haley 2024-11-04 21:16:16 -05:00
parent b60e363e5e
commit cd8df72cc6
71 changed files with 197 additions and 230 deletions

View File

@ -10,12 +10,8 @@ ignore=.git,tests
# with a local disable)
disable=
# "F" Fatal errors that prevent further processing
import-error,
# "I" Informational noise
c-extension-no-member,
locally-disabled,
# "E" Error for important programming issues (likely bugs)
access-member-before-definition,
no-member,
no-method-argument,
no-self-argument,
@ -24,65 +20,35 @@ disable=
abstract-method,
arguments-differ,
attribute-defined-outside-init,
bad-indentation,
broad-except,
dangerous-default-value,
expression-not-assigned,
fixme,
global-statement,
keyword-arg-before-vararg,
literal-comparison,
non-parent-init-called,
not-callable,
protected-access,
redefined-builtin,
redefined-outer-name,
signature-differs,
super-init-not-called,
unpacking-non-sequence,
unused-argument,
unused-import,
unused-variable,
useless-super-delegation,
using-constant-test,
redundant-u-string-prefix,
# "C" Coding convention violations
consider-iterating-dictionary,
consider-using-enumerate,
consider-using-f-string,
invalid-name,
len-as-condition,
missing-docstring,
singleton-comparison,
superfluous-parens,
ungrouped-imports,
wrong-import-order,
# "R" Refactor recommendations
consider-merging-isinstance,
consider-using-ternary,
duplicate-code,
inconsistent-return-statements,
no-else-return,
redefined-argument-from-local,
simplifiable-if-statement,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-branches,
too-many-instance-attributes,
too-many-lines,
too-many-locals,
too-many-nested-blocks,
too-many-positional-arguments,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
use-dict-literal,
super-with-arguments,
# new for python3 version of pylint
consider-using-set-comprehension,
unnecessary-pass,
useless-object-inheritance
unnecessary-pass
[BASIC]
# Variable names can be 1 to 31 characters long, with lowercase and underscores

View File

@ -30,7 +30,7 @@ def _replace_register(flow_params, register_number, register_value):
try:
reg_port = flow_params[register_value]
del flow_params[register_value]
flow_params['reg{:d}'.format(register_number)] = reg_port
flow_params[f'reg{register_number:d}'] = reg_port
except KeyError:
pass

View File

@ -13,7 +13,7 @@
import abc
class AgentExtension(object, metaclass=abc.ABCMeta):
class AgentExtension(metaclass=abc.ABCMeta):
"""Define stable abstract interface for agent extensions.
An agent extension extends the agent core functionality.

View File

@ -15,7 +15,7 @@ import abc
from neutron_lib import constants
class LinuxInterfaceDriver(object, metaclass=abc.ABCMeta):
class LinuxInterfaceDriver(metaclass=abc.ABCMeta):
DEV_NAME_LEN = constants.LINUX_DEV_LEN
DEV_NAME_PREFIX = constants.TAP_DEVICE_PREFIX

View File

@ -65,5 +65,5 @@ def get_topic_name(prefix, table, operation, host=None):
:returns: The topic name.
"""
if host:
return '%s-%s-%s.%s' % (prefix, table, operation, host)
return '%s-%s-%s' % (prefix, table, operation)
return '{}-{}-{}.{}'.format(prefix, table, operation, host)
return '{}-{}-{}'.format(prefix, table, operation)

View File

@ -137,7 +137,7 @@ def _dict_populate_defaults(attr_value, attr_spec):
return attr_value
class AttributeInfo(object):
class AttributeInfo:
"""Provides operations on a resource's attribute map.
AttributeInfo wraps an API resource's attribute dict and provides methods
@ -239,7 +239,7 @@ class AttributeInfo(object):
def _project_id_required(self, res_dict):
return (('tenant_id' in self.attributes or
'project_id' in self.attributes) and
'project_id' in self.attributes) and
'project_id' not in res_dict)
def populate_project_id(self, context, res_dict, is_create):
@ -313,5 +313,5 @@ def retrieve_valid_sort_keys(attr_info):
:param attr_info: The attribute dict for common neutron resource.
:returns: Set of sort keys.
"""
return set(attr for attr, schema in attr_info.items()
if schema.get('is_sort_key', False))
return {attr for attr, schema in attr_info.items()
if schema.get('is_sort_key', False)}

View File

@ -137,7 +137,7 @@ def convert_kvp_list_to_dict(kvp_list):
key, value = convert_kvp_str_to_list(kvp_str)
kvp_map.setdefault(key, set())
kvp_map[key].add(value)
return dict((x, list(y)) for x, y in kvp_map.items())
return {x: list(y) for x, y in kvp_map.items()}
def convert_none_to_empty_list(value):

View File

@ -13,7 +13,7 @@
# under the License.
from neutron_lib.api import converters
from neutron_lib.api.definitions import network
from neutron_lib.api.definitions import network as network_def
from neutron_lib.api import validators
from neutron_lib import constants
@ -43,7 +43,7 @@ API_PREFIX = ''
DESCRIPTION = 'Provides Vlan Transparent Networks'
UPDATED_TIMESTAMP = '2015-03-23T09:00:00-00:00'
RESOURCE_ATTRIBUTE_MAP = {
network.COLLECTION_NAME: {
network_def.COLLECTION_NAME: {
VLANTRANSPARENT: {
'allow_post': True,
'allow_put': False,

View File

@ -32,7 +32,7 @@ def is_extension_supported(plugin, alias):
return alias in getattr(plugin, "supported_extension_aliases", [])
class ExtensionDescriptor(object, metaclass=abc.ABCMeta):
class ExtensionDescriptor(metaclass=abc.ABCMeta):
"""Base class that defines the contract for extensions."""
@abc.abstractmethod

View File

@ -37,8 +37,8 @@ UNLIMITED = None
# Note: In order to ensure that the MAC address is unicast the first byte
# must be even.
MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM,
constants.HEX_ELEM)
MAC_PATTERN = "^{}[aceACE02468](:{}{{2}}){{5}}$".format(constants.HEX_ELEM,
constants.HEX_ELEM)
def _verify_dict_keys(expected_keys, target_dict, strict=True):

View File

@ -65,12 +65,12 @@ def _validate_allowed_address_pairs(address_pairs, valid_values=None):
raise exceptions.DuplicateAddressPairInRequest(
mac_address=mac, ip_address=ip_address)
invalid_attrs = set(address_pair.keys()) - set(['mac_address',
'ip_address'])
invalid_attrs = set(address_pair.keys()) - {'mac_address',
'ip_address'}
if invalid_attrs:
msg = (_("Unrecognized attribute(s) '%s'") %
', '.join(set(address_pair.keys()) -
set(['mac_address', 'ip_address'])))
{'mac_address', 'ip_address'}))
raise exc.HTTPBadRequest(msg)
if '/' in ip_address:

View File

@ -34,6 +34,6 @@ def convert_and_validate_segments(segments, valid_values=None):
if len(segment) != 3:
msg = (_("Unrecognized attribute(s) '%s'") %
', '.join(set(segment.keys()) -
set([pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID])))
{pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID}))
raise exc.HTTPBadRequest(msg)

View File

@ -57,7 +57,7 @@ def is_cancellable_event(event):
event.startswith(PRECOMMIT))
class EventPayload(object):
class EventPayload:
"""Base event payload object.
This class is intended to be the super class for all event payloads. As

View File

@ -48,7 +48,7 @@ class CallbackFailure(exceptions.MultipleExceptions):
return exc
class NotificationError(object):
class NotificationError:
def __init__(self, callback_id, error, cancellable=False):
self.callback_id = callback_id
@ -56,7 +56,8 @@ class NotificationError(object):
self._cancellable = cancellable
def __str__(self):
return 'Callback %s failed with "%s"' % (self.callback_id, self.error)
return 'Callback {} failed with "{}"'.format(
self.callback_id, self.error)
@property
def is_cancellable(self):

View File

@ -27,7 +27,7 @@ Callback = collections.namedtuple(
'Callback', ['id', 'method', 'cancellable'])
class CallbacksManager(object):
class CallbacksManager:
"""A callback system that allows objects to cooperate in a loose manner."""
def __init__(self):
@ -102,7 +102,7 @@ class CallbacksManager(object):
if not self._index[callback_id]:
del self._index[callback_id]
else:
value = '%s,%s' % (resource, event)
value = '{},{}'.format(resource, event)
raise exceptions.Invalid(element='resource,event', value=value)
def unsubscribe_by_resource(self, callback, resource):

View File

@ -404,7 +404,7 @@ TC_QDISC_PARENTS = {'root': 0xffffffff,
'ingress': 0xfffffff1}
class Sentinel(object):
class Sentinel:
"""A constant object that does not change even when copied."""
def __deepcopy__(self, memo):
# Always return the same object because this is essentially a constant.

View File

@ -137,7 +137,7 @@ class ContextBase(oslo_context.RequestContext):
context.is_admin = True
context.roles = list(
set(context.roles) | set(['admin', 'member', 'reader'])
set(context.roles) | {'admin', 'member', 'reader'}
)
return context

View File

@ -19,7 +19,7 @@ from sqlalchemy import orm
from neutron_lib.db import constants as db_const
class HasProject(object):
class HasProject:
"""Project mixin, add to subclasses that have a user."""
# NOTE: project_id is just a free form string
@ -62,7 +62,7 @@ class HasProjectPrimaryUniqueKey(HasProject):
nullable=False, primary_key=True, unique=True)
class HasId(object):
class HasId:
"""id mixin, add to subclasses that have an id."""
id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
@ -70,7 +70,7 @@ class HasId(object):
default=uuidutils.generate_uuid)
class HasStatusDescription(object):
class HasStatusDescription:
"""Status with description mixin."""
status = sa.Column(sa.String(db_const.STATUS_FIELD_SIZE),
@ -95,11 +95,11 @@ class _NeutronBase(models.ModelBase):
def __repr__(self):
"""sqlalchemy based automatic __repr__ method."""
items = ['%s=%r' % (col.name, getattr(self, col.name))
items = ['{}={!r}'.format(col.name, getattr(self, col.name))
for col in self.__table__.columns]
return "<%s.%s[object at %x] {%s}>" % (self.__class__.__module__,
self.__class__.__name__,
id(self), ', '.join(items))
return "<{}.{}[object at {:x}] {{{}}}>".format(
self.__class__.__module__, self.__class__.__name__,
id(self), ', '.join(items))
class NeutronBaseV2(_NeutronBase):

View File

@ -192,8 +192,8 @@ def query_with_hooks(context, model, field=None, lazy_fields=None):
query = query.group_by(group_by)
if lazy_fields:
for field in lazy_fields:
query = query.options(lazyload(field))
for lfield in lazy_fields:
query = query.options(lazyload(lfield))
return query

View File

@ -15,7 +15,7 @@
import abc
class QuotaDriverAPI(object, metaclass=abc.ABCMeta):
class QuotaDriverAPI(metaclass=abc.ABCMeta):
@staticmethod
@abc.abstractmethod

View File

@ -89,7 +89,7 @@ class StandardAttribute(model_base.BASEV2):
return self.id
class HasStandardAttributes(object):
class HasStandardAttributes:
@classmethod
def get_api_collections(cls):

View File

@ -151,7 +151,7 @@ def filter_non_model_columns(data, model):
proxies of the model.
"""
mapper = sqlalchemy.inspect(model)
columns = set(c.name for c in mapper.columns)
columns = {c.name for c in mapper.columns}
try:
_association_proxy = associationproxy.ASSOCIATION_PROXY
except AttributeError:
@ -160,8 +160,8 @@ def filter_non_model_columns(data, model):
associationproxy.AssociationProxyExtensionType.ASSOCIATION_PROXY)
columns.update(d.value_attr for d in mapper.all_orm_descriptors
if d.extension_type is _association_proxy)
return dict((k, v) for (k, v)
in data.items() if k in columns)
return {k: v for (k, v)
in data.items() if k in columns}
def model_query_scope_is_project(context, model):

View File

@ -83,7 +83,7 @@ def _check_imports(regex, submatch, logical_line):
def _check_namespace_imports(failure_code, namespace, new_ns, logical_line,
message_override=None):
if message_override is not None:
msg_o = "%s: %s" % (failure_code, message_override)
msg_o = "{}: {}".format(failure_code, message_override)
else:
msg_o = None

View File

@ -27,10 +27,10 @@ _log_warn = re.compile(
_log_translation_hint = re.compile(
r".*LOG\.(%(levels)s)\(\s*(%(hints)s)\(" % {
'levels': '|'.join(_all_log_levels),
'hints': '|'.join(_all_hints),
})
r".*LOG\.({levels})\(\s*({hints})\(".format(
levels='|'.join(_all_log_levels),
hints='|'.join(_all_hints),
))
def _translation_checks_not_enforced(filename):

View File

@ -26,7 +26,7 @@ def convert_filters(**kwargs):
return result
class FilterObj(object, metaclass=abc.ABCMeta):
class FilterObj(metaclass=abc.ABCMeta):
@abc.abstractmethod
def filter(self, column):

View File

@ -17,7 +17,7 @@ import functools
import re
import time
from urllib import parse
import uuid
import uuid as uuid_lib
import requests
@ -77,12 +77,12 @@ def _get_version(openstack_api_version):
class UUIDEncoder(jsonutils.JSONEncoder):
def default(self, o):
if isinstance(o, uuid.UUID):
if isinstance(o, uuid_lib.UUID):
return str(o)
return super().default(o)
class NoAuthClient(object):
class NoAuthClient:
"""Placement NoAuthClient for fullstack testing"""
def __init__(self, url):
@ -127,22 +127,22 @@ class NoAuthClient(object):
raise ks_exc.HttpError
def get(self, url, endpoint_filter, **kwargs):
return self.request('%s%s' % (self.url, url), 'GET', **kwargs)
return self.request('{}{}'.format(self.url, url), 'GET', **kwargs)
def post(self, url, json, endpoint_filter, **kwargs):
return self.request('%s%s' % (self.url, url), 'POST', body=json,
return self.request('{}{}'.format(self.url, url), 'POST', body=json,
**kwargs)
def put(self, url, json, endpoint_filter, **kwargs):
resp = self.request('%s%s' % (self.url, url), 'PUT', body=json,
resp = self.request('{}{}'.format(self.url, url), 'PUT', body=json,
**kwargs)
return resp
def delete(self, url, endpoint_filter, **kwargs):
return self.request('%s%s' % (self.url, url), 'DELETE', **kwargs)
return self.request('{}{}'.format(self.url, url), 'DELETE', **kwargs)
class PlacementAPIClient(object):
class PlacementAPIClient:
"""Client class for placement ReST API."""
def __init__(self, conf,
@ -377,7 +377,7 @@ class PlacementAPIClient(object):
filters['in_tree'] = in_tree
if uuid:
filters['uuid'] = uuid
url = '%s?%s' % (url, parse.urlencode(filters))
url = '{}?{}'.format(url, parse.urlencode(filters))
return self._get(url).json()
@_check_placement_api_available
@ -454,7 +454,7 @@ class PlacementAPIClient(object):
:raises PlacementInventoryNotFound: No inventory of class.
:returns: None.
"""
url = '/resource_providers/%s/inventories/%s' % (
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
try:
self._delete(url)
@ -481,7 +481,7 @@ class PlacementAPIClient(object):
for a resource provider.
:returns: The inventory of the resource class as a dict.
"""
url = '/resource_providers/%s/inventories/%s' % (
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
try:
return self._get(url).json()
@ -515,7 +515,7 @@ class PlacementAPIClient(object):
server side.
:returns: The updated inventory of the resource class as a dict.
"""
url = '/resource_providers/%s/inventories/%s' % (
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
body = inventory

View File

@ -20,13 +20,13 @@ TRAIT_PREFIX_PHYSNET = 'PHYSNET_'
# are left out intentionally. See also:
# https://docs.openstack.org/api-ref/placement
# /#update-resource-provider-inventory
INVENTORY_OPTIONS = set([
INVENTORY_OPTIONS = {
'allocation_ratio',
'max_unit',
'min_unit',
'reserved',
'step_size',
])
}
# Tunnelled networks resource provider default name.
RP_TUNNELLED = 'rp_tunnelled'

View File

@ -27,7 +27,7 @@ def physnet_trait(physnet):
:param physnet: The physnet name.
:returns: The trait name representing the physnet.
"""
return os_traits.normalize_name('%s%s' % (
return os_traits.normalize_name('{}{}'.format(
place_const.TRAIT_PREFIX_PHYSNET, physnet))
@ -37,7 +37,7 @@ def vnic_type_trait(vnic_type):
:param physnet: The vnic_type.
:returns: The trait name representing the vnic_type.
"""
return os_traits.normalize_name('%s%s' % (
return os_traits.normalize_name('{}{}'.format(
place_const.TRAIT_PREFIX_VNIC_TYPE, vnic_type))

View File

@ -22,7 +22,7 @@ from neutron_lib.plugins import constants
_synchronized = lockutils.synchronized_with_prefix("neutron-")
class _PluginDirectory(object):
class _PluginDirectory:
"""A directory of activated plugins in a Neutron Deployment.
The directory is bootstrapped by a Neutron Manager running in
@ -44,8 +44,8 @@ class _PluginDirectory(object):
@property
def plugins(self):
"""The mapping alias -> weak reference to the plugin."""
return dict((x, weakref.proxy(y))
for x, y in self._plugins.items())
return {x: weakref.proxy(y)
for x, y in self._plugins.items()}
@property
def unique_plugins(self):

View File

@ -33,7 +33,7 @@ BOUND_DRIVER = 'bound_driver'
BOUND_SEGMENT = 'bound_segment'
class MechanismDriver(object, metaclass=abc.ABCMeta):
class MechanismDriver(metaclass=abc.ABCMeta):
"""Define stable abstract interface for ML2 mechanism drivers.
A mechanism driver is called on the creation, update, and deletion
@ -493,7 +493,7 @@ class MechanismDriver(object, metaclass=abc.ABCMeta):
return portbindings.CONNECTIVITY_LEGACY
class _TypeDriverBase(object, metaclass=abc.ABCMeta):
class _TypeDriverBase(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_type(self):
@ -703,7 +703,7 @@ class ML2TypeDriver(_TypeDriverBase, metaclass=abc.ABCMeta):
"""
class NetworkContext(object, metaclass=abc.ABCMeta):
class NetworkContext(metaclass=abc.ABCMeta):
"""Context passed to MechanismDrivers for changes to network resources.
A NetworkContext instance wraps a network resource. It provides
@ -740,7 +740,7 @@ class NetworkContext(object, metaclass=abc.ABCMeta):
"""Return the segments associated with this network resource."""
class SubnetContext(object, metaclass=abc.ABCMeta):
class SubnetContext(metaclass=abc.ABCMeta):
"""Context passed to MechanismDrivers for changes to subnet resources.
A SubnetContext instance wraps a subnet resource. It provides
@ -772,7 +772,7 @@ class SubnetContext(object, metaclass=abc.ABCMeta):
"""
class PortContext(object, metaclass=abc.ABCMeta):
class PortContext(metaclass=abc.ABCMeta):
"""Context passed to MechanismDrivers for changes to port resources.
A PortContext instance wraps a port resource. It provides helper
@ -1084,7 +1084,7 @@ class PortContext(object, metaclass=abc.ABCMeta):
"""
class ExtensionDriver(object, metaclass=abc.ABCMeta):
class ExtensionDriver(metaclass=abc.ABCMeta):
"""Define stable abstract interface for ML2 extension drivers.
An extension driver extends the core resources implemented by the

View File

@ -37,11 +37,11 @@ LOG = logging.getLogger(__name__)
INTERFACE_HASH_LEN = 6
def _is_valid_range(val, min, max):
def _is_valid_range(val, min_val, max_val):
try:
# NOTE: use str value to not permit booleans
val = int(str(val))
return min <= val <= max
return min_val <= val <= max_val
except (ValueError, TypeError):
return False

View File

@ -18,19 +18,20 @@ import collections
import random
import time
from neutron_lib._i18n import _
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import exceptions as oslomsg_exc
from oslo_messaging import serializer as om_serializer
from oslo_service import service
from oslo_service import service as os_service
from oslo_utils import excutils
from osprofiler import profiler
from neutron_lib._i18n import _
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
LOG = logging.getLogger(__name__)
TRANSPORT = None
@ -88,7 +89,7 @@ def _get_rpc_response_max_timeout():
return TRANSPORT.conf.rpc_response_max_timeout
class _ContextWrapper(object):
class _ContextWrapper:
def __init__(self, original_context):
self._original_context = original_context
@ -138,8 +139,8 @@ class _BackingOffContextWrapper(_ContextWrapper):
# two methods with the same name in different namespaces should
# be tracked independently
if self._original_context.target.namespace:
scoped_method = '%s.%s' % (self._original_context.target.namespace,
method)
scoped_method = '{}.{}'.format(
self._original_context.target.namespace, method)
else:
scoped_method = method
# set the timeout from the global method timeout tracker for this
@ -244,7 +245,7 @@ def get_notifier(service=None, host=None, publisher_id=None):
"None. This is deprecated since 2025.1 release and "
"will be removed in one of the future releases. "
"Please always pass the 'service' argument.")
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
publisher_id = "{}.{}".format(service, host or cfg.CONF.host)
serializer = RequestContextSerializer()
return oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
@ -288,7 +289,7 @@ class RequestContextSerializer(om_serializer.Serializer):
@profiler.trace_cls("rpc")
class Service(service.Service):
class Service(os_service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
@ -332,7 +333,7 @@ class Service(service.Service):
super().stop()
class Connection(object):
class Connection:
"""A utility class that manages a collection of RPC servers."""
def __init__(self):

View File

@ -13,7 +13,7 @@
import abc
class WorkerBase(object):
class WorkerBase:
@property
def _workers(self):

View File

@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__)
@registry.has_registry_receivers
class DriverBase(object):
class DriverBase:
def __init__(self, name, vif_types, vnic_types,
supported_rules,

View File

@ -115,7 +115,7 @@ class BaseTestCase(testtools.TestCase):
conf(args)
def setUp(self):
super(BaseTestCase, self).setUp()
super().setUp()
self.useFixture(fixture.PluginDirectoryFixture())
# Enabling 'use_fatal_exceptions' allows us to catch string

View File

@ -99,7 +99,7 @@ def get_ignored_traceback(tb):
return ignored_tracebacks[-1]
class FilteredTraceback(object):
class FilteredTraceback:
"""Wraps a traceback to filter unwanted frames."""
def __init__(self, tb, filtered_traceback):

View File

@ -32,10 +32,10 @@ class TestCreateRegNumbers(_base.BaseTestCase):
constants.INGRESS_BW_LIMIT_REG_NAME: 4,
constants.MIN_BW_REG_NAME: 5}
expected_flow = {'foo': 'bar',
'reg{:d}'.format(constants.REG_PORT): 1,
'reg{:d}'.format(constants.REG_NET): 2,
'reg{:d}'.format(constants.REG_REMOTE_GROUP): 3,
'reg{:d}'.format(constants.REG_INGRESS_BW_LIMIT): 4,
'reg{:d}'.format(constants.REG_MIN_BW): 5}
f'reg{constants.REG_PORT:d}': 1,
f'reg{constants.REG_NET:d}': 2,
f'reg{constants.REG_REMOTE_GROUP:d}': 3,
f'reg{constants.REG_INGRESS_BW_LIMIT:d}': 4,
f'reg{constants.REG_MIN_BW:d}': 5}
utils.create_reg_numbers(flow)
self.assertEqual(expected_flow, flow)

View File

@ -23,7 +23,7 @@ from neutron_lib.tests import _base as test_base
def assert_bool(tester, attribute, attribute_dict, keyword, value):
tester.assertIsInstance(
value, bool,
'%s must be a boolean for %s.' % (keyword, attribute))
'{} must be a boolean for {}.'.format(keyword, attribute))
def assert_converter(tester, attribute, attribute_dict, keyword, value):
@ -45,12 +45,13 @@ def assert_converter(tester, attribute, attribute_dict, keyword, value):
def assert_true(tester, attribute, attribute_dict, keyword, value):
tester.assertTrue(
value, '%s must be True for %s.' % (keyword, attribute))
value, '{} must be True for {}.'.format(keyword, attribute))
def assert_validator(tester, attribute, attribute_dict, keyword, value):
tester.assertIn(list(value)[0], validators.validators,
'%s is not a known validator for %s.' % (value, attribute))
'{} is not a known validator for {}.'.format(
value, attribute))
ASSERT_FUNCTIONS = {
@ -79,7 +80,7 @@ class DefinitionBaseTestCase(test_base.BaseTestCase):
extension_attributes = ()
def setUp(self):
super(DefinitionBaseTestCase, self).setUp()
super().setUp()
if not self.extension_module:
self.fail("Missing extension module definition.")
self.alias = self.extension_module.ALIAS

View File

@ -34,8 +34,7 @@ class BgpvpnDefinitionTestCase(base.DefinitionBaseTestCase):
['4294967295:65536'],
[''],
]
for value in values:
yield value
yield from values
def _data_for_valid_rtdt(self):
values = [['1:1'],
@ -48,8 +47,7 @@ class BgpvpnDefinitionTestCase(base.DefinitionBaseTestCase):
['65536:65535'],
['4294967295:65535'],
]
for value in values:
yield value
yield from values
def test_valid_rtrd(self):
for rtrd in self._data_for_valid_rtdt():

View File

@ -353,6 +353,6 @@ class TestRetrieveValidSortKeys(base.BaseTestCase):
"visible": True,
}
}
expect_val = set(["id", "name"])
expect_val = {"id", "name"}
actual_val = attributes.retrieve_valid_sort_keys(attr_info)
self.assertEqual(expect_val, actual_val)

View File

@ -165,7 +165,7 @@ class TestConvertToList(base.BaseTestCase):
self.assertEqual([item], converters.convert_to_list(item))
def test_convert_to_list_iterable(self):
for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']):
for item in ([None], [1, 2, 3], (1, 2, 3), {1, 2, 3}, ['foo']):
self.assertEqual(list(item), converters.convert_to_list(item))
def test_convert_to_list_non_iterable(self):
@ -188,11 +188,11 @@ class TestConvertIPv6AddrCanonicalFormat(base.BaseTestCase):
def test_convert_ipv6_extended_addr_to_compressed(self):
result = converters.convert_ip_to_canonical_format(
u"Fe80:0:0:0:0:0:0:1")
"Fe80:0:0:0:0:0:0:1")
self.assertEqual('fe80::1', result)
def test_convert_ipv4_address(self):
result = converters.convert_ip_to_canonical_format(u"192.168.1.1")
result = converters.convert_ip_to_canonical_format("192.168.1.1")
self.assertEqual('192.168.1.1', result)
def test_convert_None_address(self):
@ -264,7 +264,7 @@ class TestConvertIPv6CIDRCanonicalFormat(base.BaseTestCase):
class TestConvertStringToCaseInsensitive(base.BaseTestCase):
def test_convert_string_to_lower(self):
result = converters.convert_string_to_case_insensitive(u"THIS Is tEsT")
result = converters.convert_string_to_case_insensitive("THIS Is tEsT")
self.assertIsInstance(result, str)
def test_assert_error_on_non_string(self):

View File

@ -99,7 +99,7 @@ class DummyPlugin(service_base.ServicePluginBase):
class TestExtensionIsSupported(base.BaseTestCase):
def setUp(self):
super(TestExtensionIsSupported, self).setUp()
super().setUp()
self._plugin = DummyPlugin()
def test_extension_exists(self):
@ -124,7 +124,7 @@ class TestAPIExtensionDescriptor(base.BaseTestCase):
OPTIONAL_EXTENSIONS = ['fw']
def setUp(self):
super(TestAPIExtensionDescriptor, self).setUp()
super().setUp()
self.extn = _APIDefinition()
self.empty_extn = _EmptyAPIDefinition()
self.useFixture(fixture.APIDefinitionFixture(self))

View File

@ -159,13 +159,13 @@ class TestAttributeValidation(base.BaseTestCase):
def test_validate_not_empty_string(self):
msg = validators.validate_not_empty_string(' ', None)
self.assertEqual(u"' ' Blank strings are not permitted", msg)
self.assertEqual("' ' Blank strings are not permitted", msg)
msg = validators.validate_not_empty_string(123, None)
self.assertEqual(u"'123' is not a valid string", msg)
self.assertEqual("'123' is not a valid string", msg)
def test_validate_not_empty_string_or_none(self):
msg = validators.validate_not_empty_string_or_none(' ', None)
self.assertEqual(u"' ' Blank strings are not permitted", msg)
self.assertEqual("' ' Blank strings are not permitted", msg)
msg = validators.validate_not_empty_string_or_none(None, None)
self.assertIsNone(msg)
@ -231,7 +231,7 @@ class TestAttributeValidation(base.BaseTestCase):
max_len = 4
msg = validators.validate_oneline_not_empty_string(data, max_len)
self.assertEqual(
"'%s' exceeds maximum length of %s" % (data, max_len),
"'{}' exceeds maximum length of {}".format(data, max_len),
msg)
data = "First line\nsecond line"
@ -260,7 +260,7 @@ class TestAttributeValidation(base.BaseTestCase):
msg = validators.validate_oneline_not_empty_string_or_none(
data, max_len)
self.assertEqual(
"'%s' exceeds maximum length of %s" % (data, max_len),
"'{}' exceeds maximum length of {}".format(data, max_len),
msg)
data = "First line\nsecond line"
@ -803,11 +803,11 @@ class TestAttributeValidation(base.BaseTestCase):
def test_validate_subnet_list(self):
msg = validators.validate_subnet_list('abc')
self.assertEqual(u"'abc' is not a list", msg)
self.assertEqual("'abc' is not a list", msg)
msg = validators.validate_subnet_list(['10.1.0.0/24',
'10.2.0.0/24',
'10.1.0.0/24'])
self.assertEqual(u"Duplicate items in the list: '10.1.0.0/24'", msg)
self.assertEqual("Duplicate items in the list: '10.1.0.0/24'", msg)
cidrs = ['10.1.0.0/24', '10.2.0.0']
msg = validators.validate_subnet_list(cidrs)
@ -1278,13 +1278,13 @@ class TestPortRangeValidation(base.BaseTestCase):
def test_invalid_port_specific_range(self):
result = validators.validate_port_range_or_none("4:500000",
[1, 65535])
self.assertEqual(u"Invalid port: 500000", result)
self.assertEqual("Invalid port: 500000", result)
def test_invalid_port_for_specific_range(self):
result = validators.validate_port_range_or_none("0:10",
[1, 65535])
self.assertEqual(u"Invalid port: 0, the port must be in"
u" the range [1, 65535]", result)
self.assertEqual("Invalid port: 0, the port must be in"
" the range [1, 65535]", result)
def test_valid_port(self):
result = validators.validate_port_range_or_none("80")
@ -1302,43 +1302,43 @@ class TestPortRangeValidation(base.BaseTestCase):
def test_port_too_high(self):
result = validators.validate_port_range_or_none("99999")
self.assertEqual(u"Invalid port: 99999", result)
self.assertEqual("Invalid port: 99999", result)
def test_port_too_low(self):
result = validators.validate_port_range_or_none("-1")
self.assertEqual(u"Invalid port: -1", result)
self.assertEqual("Invalid port: -1", result)
def test_range_too_high(self):
result = validators.validate_port_range_or_none("80:99999")
self.assertEqual(u"Invalid port: 99999", result)
self.assertEqual("Invalid port: 99999", result)
def test_range_too_low(self):
result = validators.validate_port_range_or_none("-1:8888")
self.assertEqual(u"Invalid port: -1", result)
self.assertEqual("Invalid port: -1", result)
def test_range_wrong_way(self):
# NOTE(huntxu): This case would fail when ports are compared as
# strings, since '1111' < '9'.
result = validators.validate_port_range_or_none("1111:9")
self.assertEqual(u"First port in a port range must be lower than the "
self.assertEqual("First port in a port range must be lower than the "
"second port", result)
def test_range_invalid(self):
result = validators.validate_port_range_or_none("DEAD:BEEF")
self.assertEqual(u"Invalid port: DEAD", result)
self.assertEqual("Invalid port: DEAD", result)
def test_range_bad_input(self):
result = validators.validate_port_range_or_none(['a', 'b', 'c'])
self.assertEqual(u"Invalid port: ['a', 'b', 'c']", result)
self.assertEqual("Invalid port: ['a', 'b', 'c']", result)
def test_range_colon(self):
result = validators.validate_port_range_or_none(":")
self.assertEqual(u"Port range must be two integers separated by a "
self.assertEqual("Port range must be two integers separated by a "
"colon", result)
def test_too_many_colons(self):
result = validators.validate_port_range_or_none("80:888:8888")
self.assertEqual(u"Port range must be two integers separated by a "
self.assertEqual("Port range must be two integers separated by a "
"colon", result)
@ -1390,7 +1390,7 @@ class TestAnyKeySpecs(base.BaseTestCase):
class TestServicePluginType(base.BaseTestCase):
def setUp(self):
super(TestServicePluginType, self).setUp()
super().setUp()
self._plugins = directory._PluginDirectory()
self._plugins.add_plugin('stype', mock.Mock())
self.useFixture(fixture.PluginDirectoryFixture(

View File

@ -30,7 +30,7 @@ PRI_MED = 5000
PRI_LOW = 10000
class ObjectWithCallback(object):
class ObjectWithCallback:
def __init__(self):
self.counter = 0
@ -77,7 +77,7 @@ def callback_3(resource, event, trigger, payload):
class CallBacksManagerTestCase(base.BaseTestCase):
def setUp(self):
super(CallBacksManagerTestCase, self).setUp()
super().setUp()
self.manager = manager.CallbacksManager()
self.event_payload = events.EventPayload(object())
callback_1.counter = 0

View File

@ -27,7 +27,7 @@ PRI_CALLBACK = 20
@registry.has_registry_receivers
class ObjectWithDecoratedCallback(object):
class ObjectWithDecoratedCallback:
def __init__(self):
self.counter = 0
@ -39,9 +39,9 @@ class ObjectWithDecoratedCallback(object):
self.counter += 1
class MixinWithNew(object):
class MixinWithNew:
def __new__(cls):
i = super(MixinWithNew, cls).__new__(cls)
i = super().__new__(cls)
i.new_called = True
return i
@ -51,7 +51,7 @@ class AnotherObjectWithDecoratedCallback(ObjectWithDecoratedCallback,
MixinWithNew):
def __init__(self):
super(AnotherObjectWithDecoratedCallback, self).__init__()
super().__init__()
self.counter2 = 0
@registry.receives(resources.NETWORK, [events.AFTER_DELETE], PRI_CALLBACK)
@ -60,7 +60,7 @@ class AnotherObjectWithDecoratedCallback(ObjectWithDecoratedCallback,
@registry.has_registry_receivers
class CallbackClassWithParameters(object):
class CallbackClassWithParameters:
def __init__(self, dummy):
pass
@ -122,7 +122,7 @@ class CallBacksManagerTestCase(base.BaseTestCase):
class TestCallbackRegistryDispatching(base.BaseTestCase):
def setUp(self):
super(TestCallbackRegistryDispatching, self).setUp()
super().setUp()
self.callback_manager = mock.Mock()
self.registry_fixture = fixture.CallbackRegistryFixture(
callback_manager=self.callback_manager)

View File

@ -22,5 +22,5 @@ from neutron_lib.tests import _base as base
class SqlTestCase(base.BaseTestCase):
def setUp(self):
super(SqlTestCase, self).setUp()
super().setUp()
self.useFixture(fixture.SqlFixture())

View File

@ -140,7 +140,7 @@ class TestDeadLockDecorator(_base.BaseTestCase):
10, 10, 10, 10, 10,
10, 10, 10, 10, 10]
class FakeTime(object):
class FakeTime:
def __init__(self):
self.counter = 0

View File

@ -27,7 +27,7 @@ class TestTable(model_base.BASEV2, model_base.HasProject,
class TestModelBase(db_base.SqlTestCase):
def setUp(self):
super(TestModelBase, self).setUp()
super().setUp()
self.ctx = context.Context('user', 'project')
self.session = self.ctx.session

View File

@ -26,7 +26,7 @@ from neutron_lib.utils import helpers
class TestHooks(_base.BaseTestCase):
def setUp(self):
super(TestHooks, self).setUp()
super().setUp()
self.useFixture(fixture.DBQueryHooksFixture())
def _mock_hook(self, x):

View File

@ -17,7 +17,7 @@ from neutron_lib import fixture
@resource_extend.has_resource_extenders
class _DBExtender(object):
class _DBExtender:
@resource_extend.extends('ExtendedA')
def _extend_a(self, resp, db_obj):
@ -38,7 +38,7 @@ class TestResourceExtendClass(base.BaseTestCase):
class TestResourceExtend(base.BaseTestCase):
def setUp(self):
super(TestResourceExtend, self).setUp()
super().setUp()
self.useFixture(fixture.DBResourceExtendFixture())
def test_register_funcs(self):

View File

@ -24,7 +24,7 @@ from neutron_lib.tests import _base as base
class StandardAttrTestCase(base.BaseTestCase):
def setUp(self):
super(StandardAttrTestCase, self).setUp()
super().setUp()
self.addCleanup(gc.collect)
def _make_decl_base(self):

View File

@ -28,7 +28,7 @@ FakeMessage = collections.namedtuple('Message',
'event_type', 'payload'])
class FakeNotifier(object):
class FakeNotifier:
def __init__(self, transport, publisher_id=None,
driver=None, topics=None,

View File

@ -100,11 +100,11 @@ class HackingTestCase(base.BaseTestCase):
def test_no_log_translations(self):
for log in tc._all_log_levels:
for hint in tc._all_hints:
bad = 'LOG.%s(%s("Bad"))' % (log, hint)
bad = 'LOG.{}({}("Bad"))'.format(log, hint)
self.assertEqual(
1, len(list(tc.no_translate_logs(bad, 'f'))))
# Catch abuses when used with a variable and not a literal
bad = 'LOG.%s(%s(msg))' % (log, hint)
bad = 'LOG.{}({}(msg))'.format(log, hint)
self.assertEqual(
1, len(list(tc.no_translate_logs(bad, 'f'))))
# Do not do validations in tests

View File

@ -24,7 +24,7 @@ from neutron_lib.tests import tools
from neutron_lib.utils import net
class TestField(object):
class TestField:
def test_coerce_good_values(self):
for in_val, out_val in self.coerce_good_values:
@ -47,7 +47,7 @@ class TestField(object):
self.assertEqual(prim, jsonutils.loads(jsencoded))
def test_from_primitive(self):
class ObjectLikeThing(object):
class ObjectLikeThing:
_context = 'context'
for prim_val, out_val in self.from_primitive_values:
@ -64,7 +64,7 @@ class TestField(object):
class IPV6ModeEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(IPV6ModeEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.IPV6ModeEnumField()
self.coerce_good_values = [(mode, mode)
for mode in const.IPV6_MODES]
@ -79,7 +79,7 @@ class IPV6ModeEnumFieldTest(test_base.BaseTestCase, TestField):
class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(DscpMarkFieldTest, self).setUp()
super().setUp()
self.field = common_types.DscpMarkField()
self.coerce_good_values = [(val, val)
for val in const.VALID_DSCP_MARKS]
@ -94,7 +94,7 @@ class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(IPNetworkPrefixLenFieldTest, self).setUp()
super().setUp()
self.field = common_types.IPNetworkPrefixLenField()
self.coerce_good_values = [(x, x) for x in (0, 32, 128, 42)]
self.coerce_bad_values = ['len', '1', 129, -1]
@ -108,7 +108,7 @@ class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
class MACAddressFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(MACAddressFieldTest, self).setUp()
super().setUp()
self.field = common_types.MACAddressField()
mac1 = tools.get_random_EUI()
mac2 = tools.get_random_EUI()
@ -131,7 +131,7 @@ class MACAddressFieldTest(test_base.BaseTestCase, TestField):
class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(IPNetworkFieldTest, self).setUp()
super().setUp()
self.field = common_types.IPNetworkField()
addrs = [
tools.get_random_ip_network(version=ip_version)
@ -156,7 +156,7 @@ class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(IPVersionEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.IPVersionEnumField()
self.coerce_good_values = [(val, val)
for val in const.IP_ALLOWED_VERSIONS]
@ -171,7 +171,7 @@ class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(FlowDirectionEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.FlowDirectionEnumField()
self.coerce_good_values = [(val, val)
for val in const.VALID_DIRECTIONS]
@ -186,7 +186,7 @@ class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
class FlowDirectionAndAnyEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(FlowDirectionAndAnyEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.FlowDirectionAndAnyEnumField()
self.coerce_good_values = [
(val, val) for val in const.VALID_DIRECTIONS_AND_ANY]
@ -201,7 +201,7 @@ class FlowDirectionAndAnyEnumFieldTest(test_base.BaseTestCase, TestField):
class PortRangesFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(PortRangesFieldTest, self).setUp()
super().setUp()
self.field = common_types.PortRangesField()
self.coerce_good_values = [(val, val) for val in (
'80:80', '80:90', '80', 80)]
@ -214,7 +214,7 @@ class PortRangesFieldTest(test_base.BaseTestCase, TestField):
class DomainNameFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(DomainNameFieldTest, self).setUp()
super().setUp()
self.field = common_types.DomainNameField()
self.coerce_good_values = [
(val, val)
@ -231,7 +231,7 @@ class DomainNameFieldTest(test_base.BaseTestCase, TestField):
class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(EtherTypeEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.EtherTypeEnumField()
self.coerce_good_values = [(val, val)
for val in const.VALID_ETHERTYPES]
@ -246,7 +246,7 @@ class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(IpProtocolEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.IpProtocolEnumField()
self.coerce_good_values = [
(val, val)
@ -266,7 +266,7 @@ class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
class UUIDFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(UUIDFieldTest, self).setUp()
super().setUp()
self.field = common_types.UUIDField()
self.coerce_good_values = [
('f1d9cb3f-c263-45d3-907c-d12a9ef1629e',
@ -286,7 +286,7 @@ class UUIDFieldTest(test_base.BaseTestCase, TestField):
class DictOfMiscValuesFieldTest(test_base.BaseTestCase, TestField):
def setUp(self):
super(DictOfMiscValuesFieldTest, self).setUp()
super().setUp()
self.field = common_types.DictOfMiscValues
test_dict_1 = {'a': True,
'b': 1.23,
@ -315,7 +315,7 @@ class DictOfMiscValuesFieldTest(test_base.BaseTestCase, TestField):
class NetworkSegmentRangeNetworkTypeEnumFieldTest(test_base.BaseTestCase,
TestField):
def setUp(self):
super(NetworkSegmentRangeNetworkTypeEnumFieldTest, self).setUp()
super().setUp()
self.field = common_types.NetworkSegmentRangeNetworkTypeEnumField()
self.coerce_good_values = [(val, val)
for val in [const.TYPE_VLAN,

View File

@ -20,7 +20,7 @@ class TestUtils(base.BaseTestCase):
def test_get_objects_with_filters_not_in(self):
class FakeColumn(object):
class FakeColumn:
def __init__(self, column):
self.column = column
@ -45,7 +45,7 @@ class TestUtils(base.BaseTestCase):
def test_get_objects_with_filters_not_equal(self):
class FakeColumn(object):
class FakeColumn:
def __init__(self, column):
self.column = column

View File

@ -46,7 +46,7 @@ INVENTORY = {
class TestNoAuthClient(base.BaseTestCase):
def setUp(self):
super(TestNoAuthClient, self).setUp()
super().setUp()
self.noauth_client = place_client.NoAuthClient('placement/')
self.body_json = jsonutils.dumps({'name': 'foo'})
self.uuid = '42'
@ -79,7 +79,7 @@ class TestNoAuthClient(base.BaseTestCase):
class TestPlacementAPIClientNoAuth(base.BaseTestCase):
def setUp(self):
super(TestPlacementAPIClientNoAuth, self).setUp()
super().setUp()
self.config = mock.Mock()
@mock.patch('neutron_lib.placement.client.NoAuthClient', autospec=True)
@ -105,7 +105,7 @@ class TestPlacementAPIClientNoAuth(base.BaseTestCase):
class TestPlacementAPIClient(base.BaseTestCase):
def setUp(self):
super(TestPlacementAPIClient, self).setUp()
super().setUp()
config = mock.Mock()
config.region_name = 'region_name'
self.openstack_api_version = (

View File

@ -22,7 +22,7 @@ from neutron_lib.tests import _base as base
class TestPlacementUtils(base.BaseTestCase):
def setUp(self):
super(TestPlacementUtils, self).setUp()
super().setUp()
self._uuid_ns = uuid.UUID('94fedd4d-1ce0-4bb3-9c9a-c9c0f56de154')

View File

@ -68,7 +68,7 @@ class DirectoryTestCase(base.BaseTestCase):
class PluginDirectoryTestCase(base.BaseTestCase):
def setUp(self):
super(PluginDirectoryTestCase, self).setUp()
super().setUp()
self.plugin_directory = directory._PluginDirectory()
def test_add_plugin(self):

View File

@ -21,7 +21,7 @@ from neutron_lib.tests import _base as base
class TestPolicyEnforcer(base.BaseTestCase):
def setUp(self):
super(TestPolicyEnforcer, self).setUp()
super().setUp()
# Isolate one _ROLE_ENFORCER per test case
mock.patch.object(policy_engine, '_ROLE_ENFORCER', None).start()

View File

@ -23,7 +23,7 @@ class _Worker(base.WorkerBase):
class Test_WorkerSupportServiceMixin(test_base.BaseTestCase):
def setUp(self):
super(Test_WorkerSupportServiceMixin, self).setUp()
super().setUp()
self.worker = _Worker()
def test_allocate_workers(self):
@ -65,7 +65,7 @@ class TestPluginInterface(test_base.BaseTestCase):
self.assertTrue(issubclass(A, B))
def test_issubclass_hook_class_without_abstract_methods(self):
class A(object):
class A:
def f(self):
pass

View File

@ -23,7 +23,7 @@ from neutron_lib.tests import _base
class TestNeutronContext(_base.BaseTestCase):
def setUp(self):
super(TestNeutronContext, self).setUp()
super().setUp()
db_api = 'neutron_lib.db.api.get_writer_session'
self._db_api_session_patcher = mock.patch(db_api)
self.db_api_session = self._db_api_session_patcher.start()

View File

@ -31,7 +31,7 @@ from neutron_lib.tests.unit.api import test_attributes
class PluginDirectoryFixtureTestCase(base.BaseTestCase):
def setUp(self):
super(PluginDirectoryFixtureTestCase, self).setUp()
super().setUp()
self.directory = mock.Mock()
self.useFixture(fixture.PluginDirectoryFixture(
plugin_directory=self.directory))
@ -44,7 +44,7 @@ class PluginDirectoryFixtureTestCase(base.BaseTestCase):
class CallbackRegistryFixtureTestCase(base.BaseTestCase):
def setUp(self):
super(CallbackRegistryFixtureTestCase, self).setUp()
super().setUp()
self.manager = mock.Mock()
self.useFixture(fixture.CallbackRegistryFixture(
callback_manager=self.manager))
@ -57,7 +57,7 @@ class CallbackRegistryFixtureTestCase(base.BaseTestCase):
class SqlFixtureTestCase(base.BaseTestCase):
def setUp(self):
super(SqlFixtureTestCase, self).setUp()
super().setUp()
options.set_defaults(
cfg.CONF,
connection='sqlite://')

View File

@ -28,7 +28,7 @@ CONF = cfg.CONF
class TestRPC(base.BaseTestCase):
def setUp(self):
super(TestRPC, self).setUp()
super().setUp()
self.useFixture(fixture.RPCFixture())
@mock.patch.object(rpc, 'RequestContextSerializer')
@ -133,7 +133,7 @@ class TestRPC(base.BaseTestCase):
class TestRequestContextSerializer(base.BaseTestCase):
def setUp(self):
super(TestRequestContextSerializer, self).setUp()
super().setUp()
self.mock_base = mock.Mock()
self.ser = rpc.RequestContextSerializer(self.mock_base)
self.ser_null = rpc.RequestContextSerializer(None)
@ -214,7 +214,7 @@ class ServiceTestCase(base.BaseTestCase):
# the class cannot be based on BaseTestCase since it mocks rpc.Connection
def setUp(self):
super(ServiceTestCase, self).setUp()
super().setUp()
self.host = 'foo'
self.topic = 'neutron-agent'
@ -246,7 +246,7 @@ class ServiceTestCase(base.BaseTestCase):
class TimeoutTestCase(base.BaseTestCase):
def setUp(self):
super(TimeoutTestCase, self).setUp()
super().setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
@ -406,7 +406,7 @@ class TimeoutTestCase(base.BaseTestCase):
class CastExceptionTestCase(base.BaseTestCase):
def setUp(self):
super(CastExceptionTestCase, self).setUp()
super().setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
@ -428,7 +428,7 @@ class CastExceptionTestCase(base.BaseTestCase):
class TestConnection(base.BaseTestCase):
def setUp(self):
super(TestConnection, self).setUp()
super().setUp()
self.conn = rpc.Connection()
@mock.patch.object(messaging, 'Target')

View File

@ -38,14 +38,14 @@ class _BaseWorker(worker.BaseWorker):
class _ProcWorker(_BaseWorker):
def __init__(self, worker_process_count=1, set_proctitle='on'):
super(_ProcWorker, self).__init__(worker_process_count, set_proctitle)
super().__init__(worker_process_count, set_proctitle)
self._my_pid = -1 # make it appear to be a separate process
class TestBaseWorker(base.BaseTestCase):
def setUp(self):
super(TestBaseWorker, self).setUp()
super().setUp()
self._reg = mock.Mock()
self.useFixture(fixture.CallbackRegistryFixture(
callback_manager=self._reg))

View File

@ -21,7 +21,7 @@ from neutron_lib.utils import file
class TestReplaceFile(base.BaseTestCase):
def setUp(self):
super(TestReplaceFile, self).setUp()
super().setUp()
temp_dir = self.get_default_temp_dir().path
self.file_name = os.path.join(temp_dir, "new_file")
self.data = "data to copy"

View File

@ -19,7 +19,7 @@ from neutron_lib.tests import _base as base
from neutron_lib.utils import runtime
class _DummyDriver(object):
class _DummyDriver:
driver = mock.sentinel.dummy_driver

View File

@ -97,7 +97,7 @@ def dict2str(dic):
:returns: The dict in str representation that is a k=v command list for
each item in dic.
"""
return ','.join("%s=%s" % (key, val)
return ','.join("{}={}".format(key, val)
for key, val in sorted(dic.items()))
@ -135,8 +135,8 @@ def diff_list_of_dict(old_list, new_list):
:returns: A tuple where the first item is a list of the added dicts in
the diff and the second item is the removed dicts.
"""
new_set = set([dict2str(i) for i in new_list])
old_set = set([dict2str(i) for i in old_list])
new_set = {dict2str(i) for i in new_list}
old_set = {dict2str(i) for i in old_list}
added = new_set - old_set
removed = old_set - new_set
return [str2dict(a) for a in added], [str2dict(r) for r in removed]

View File

@ -88,7 +88,7 @@ def is_port_trusted(port):
constants.DEVICE_OWNER_NETWORK_PREFIX)
class _AuthenticBase(object):
class _AuthenticBase:
def __init__(self, addr, **kwargs):
super().__init__(addr, **kwargs)
self._initial_value = addr

View File

@ -30,7 +30,7 @@ SYNCHRONIZED_PREFIX = 'neutron-'
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
class NamespacedPlugins(object):
class NamespacedPlugins:
"""Wraps a stevedore plugin namespace to load/access its plugins."""
def __init__(self, namespace):

View File

@ -15,7 +15,7 @@
import abc
class BaseChecks(object, metaclass=abc.ABCMeta):
class BaseChecks(metaclass=abc.ABCMeta):
"""Base class providing upgrade checks.

View File

@ -93,7 +93,7 @@ class BaseWorker(service.ServiceBase):
if not desc:
desc = self.__class__.__name__
proctitle = "%s: %s" % (name, desc)
proctitle = "{}: {}".format(name, desc)
if self._set_proctitle == "on":
proctitle += " (%s)" % self._parent_proctitle