pre-commit: Migrate to ruff, enable autopep8

This allows us to replace bandit and flake8. It also ensures we run
pyupgrade-like checks continuously to prevent adding Python < 3.10 style
syntax in the future.

As running the pep8 check locally ran autopep8, a number of
changes were made automatically by the tools for f-strings,
etc. Some manual fix-up was required.

Change-Id: I9919ec0f477abe55d7eac16f2ebe3ab983ee8799
Signed-off-by: Brian Haley <haleyb.dev@gmail.com>
This commit is contained in:
Brian Haley
2026-01-08 14:35:23 -05:00
parent 5cf39169a4
commit 19f92f64f4
45 changed files with 328 additions and 308 deletions

View File

@@ -15,26 +15,29 @@ repos:
- id: check-json
- id: check-merge-conflict
- id: debug-statements
- id: check-yaml
files: .*\.(yaml|yml)$
- repo: https://github.com/PyCQA/bandit
rev: 1.8.3
hooks:
- id: bandit
# B104: Possible binding to all interfaces
# B110: Try, Except, Pass detected
# B303: Blacklist use of insecure MD2, MD4, MD5, or SHA1 hash functions
# B311: Standard pseudo-random generators are not suitable for security/cryptographic purpose
args: ['-n5', '-sB104,B303,B311']
args: ['-n5', '-sB104,B110,B303,B311']
files: 'neutron_lib/'
exclude: 'neutron_lib/tests'
- repo: https://github.com/Lucas-C/pre-commit-hooks
- repo: https://github.com/lucas-c/pre-commit-hooks
rev: v1.5.5
hooks:
- id: remove-tabs
exclude: '.*\.(svg)$'
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.15.0
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.1
hooks:
- id: mypy
pass_filenames: false
- id: ruff-check
args: ['--fix', '--unsafe-fixes']
- repo: https://opendev.org/openstack/hacking
rev: 7.0.0
hooks:
@@ -43,6 +46,16 @@ repos:
- flake8-import-order~=0.18.2
- neutron
exclude: '^(doc|releasenotes|tools)/.*$'
- repo: https://github.com/hhatto/autopep8
rev: v2.3.2
hooks:
- id: autopep8
files: '^.*\.py$'
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.16.1
hooks:
- id: mypy
pass_filenames: false
- repo: local
hooks:
- id: check-unit-test-structure

View File

@@ -1971,7 +1971,6 @@ agent:
in: body
required: true
type: object
type: string
agent_resources_synced:
description: |
The value ``null`` means no resource view synchronization to Placement

View File

@@ -52,7 +52,7 @@ templates_path = []
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8'
# source_encoding = 'utf-8'
# The master doctree document.
master_doc = 'index'
@@ -63,13 +63,13 @@ copyright = '2015-present, OpenStack Foundation.'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
# unused_docs = []
@@ -79,14 +79,14 @@ copyright = '2015-present, OpenStack Foundation.'
exclude_trees = []
# The reST default role (for this markup: `text`) to use for all documents.
#default_role = None
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
@@ -120,26 +120,26 @@ html_theme = 'openstackdocs'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = ['_theme']
# html_theme_path = ['_theme']
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
@@ -148,46 +148,46 @@ html_theme = 'openstackdocs'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# html_additional_pages = {}
# If false, no module index is generated.
#html_use_modindex = True
# html_use_modindex = True
# If false, no index is generated.
#html_use_index = True
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = ''
# html_file_suffix = ''
# Output file base name for HTML help builder.
#htmlhelp_basename = 'neutronlibdoc'
# htmlhelp_basename = 'neutronlibdoc'
# -- Options for LaTeX output ------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author,
@@ -199,20 +199,20 @@ latex_documents = [
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# latex_use_parts = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# latex_appendices = []
# If false, no module index is generated.
#latex_use_modindex = True
# latex_use_modindex = True
latex_elements = {
'makeindex': '',

View File

@@ -65,5 +65,5 @@ def get_topic_name(prefix, table, operation, host=None):
:returns: The topic name.
"""
if host:
return '{}-{}-{}.{}'.format(prefix, table, operation, host)
return '{}-{}-{}'.format(prefix, table, operation)
return f'{prefix}-{table}-{operation}.{host}'
return f'{prefix}-{table}-{operation}'

View File

@@ -218,7 +218,7 @@ def convert_ip_to_canonical_format(value):
ip = netaddr.IPAddress(value)
if ip.version == constants.IP_VERSION_6:
return str(ip.format(dialect=netaddr.ipv6_compact))
except Exception: # nosec B110
except Exception: # noqa: S110
# netaddr may raise all kinds of exceptions (ValueError,
# AttributeError...) if the input is not a valid IP address. Instead of
# catching them one by one, just catch all exceptions at once.

View File

@@ -19,8 +19,8 @@ from neutron_lib.db import constants as db_constants
ADDRESS_SCOPE = 'address_scope'
ADDRESS_SCOPE_ID = 'address_scope_id'
IPV4_ADDRESS_SCOPE = 'ipv4_%s' % ADDRESS_SCOPE
IPV6_ADDRESS_SCOPE = 'ipv6_%s' % ADDRESS_SCOPE
IPV4_ADDRESS_SCOPE = f'ipv4_{ADDRESS_SCOPE}'
IPV6_ADDRESS_SCOPE = f'ipv6_{ADDRESS_SCOPE}'
ALIAS = 'address-scope'
IS_SHIM_EXTENSION = False

View File

@@ -28,7 +28,7 @@ DESCRIPTION = ("Discover and advertise routes for Neutron prefixes "
UPDATED_TIMESTAMP = '2016-05-10T15:37:00-00:00'
BGP_SPEAKER_RESOURCE_NAME = 'bgp-speaker'
BGP_SPEAKER_BODY_KEY_NAME = 'bgp_speaker'
BGP_SPEAKERS = '%ss' % BGP_SPEAKER_BODY_KEY_NAME
BGP_SPEAKERS = f'{BGP_SPEAKER_BODY_KEY_NAME}s'
BGP_PEER_BODY_KEY_NAME = 'bgp_peer'

View File

@@ -27,14 +27,13 @@ UINT16_REGEX = (r'(0|[1-9]\d{0,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}'
# Regular expression to validate 8 bits unsigned int
UINT8_REGEX = (r'(0|[1-9]\d{0,1}|1\d{2}|2[0-4]\d|25[0-5])')
# Regular expression to validate IPv4 address
IP4_REGEX = (r'(%s\.%s\.%s\.%s)') % (UINT8_REGEX, UINT8_REGEX, UINT8_REGEX,
UINT8_REGEX)
IP4_REGEX = (rf'({UINT8_REGEX}\.{UINT8_REGEX}\.{UINT8_REGEX}\.{UINT8_REGEX})')
# Regular expression to validate Route Target list format
# Support of the Type 0, Type 1 and Type 2, cf. chapter 4.2 in RFC 4364
# Also validates Route Distinguisher list format
RTRD_REGEX = (r'^(%s:%s|%s:%s|%s:%s)$') % (UINT16_REGEX, UINT32_REGEX,
IP4_REGEX, UINT16_REGEX,
UINT32_REGEX, UINT16_REGEX)
RTRD_REGEX = (
rf'^({UINT16_REGEX}:{UINT32_REGEX}|{IP4_REGEX}:'
rf'{UINT16_REGEX}|{UINT32_REGEX}:{UINT16_REGEX})$')
# The alias of the extension.
ALIAS = 'bgpvpn'

View File

@@ -31,7 +31,7 @@ UPDATED_TIMESTAMP = '2013-03-28T10:00:00-00:00'
RESOURCE_NAME = l3.ROUTER
COLLECTION_NAME = l3.ROUTERS
routers: typing.Dict[str, typing.Any] = copy.deepcopy(
routers: dict[str, typing.Any] = copy.deepcopy(
l3.RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME]
)
routers[l3.EXTERNAL_GW_INFO]['validate']['type:dict_or_nodata'][

View File

@@ -30,7 +30,7 @@ UPDATED_TIMESTAMP = '2023-01-18T00:00:00-00:00'
RESOURCE_NAME = l3.ROUTER
COLLECTION_NAME = l3.ROUTERS
external_gw_info_validate: typing.Dict[str, typing.Any] = copy.deepcopy(
external_gw_info_validate: dict[str, typing.Any] = copy.deepcopy(
l3_ext_gw_mode.RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME][l3.EXTERNAL_GW_INFO]
['validate']['type:dict_or_nodata']
)

View File

@@ -30,7 +30,7 @@ UPDATED_TIMESTAMP = '2018-02-24T00:00:00-00:00'
RESOURCE_NAME = l3.ROUTER
COLLECTION_NAME = l3.ROUTERS
routers: typing.Dict[str, typing.Any] = copy.deepcopy(
routers: dict[str, typing.Any] = copy.deepcopy(
l3_ext_gw_mode.RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME]
)
routers[l3.EXTERNAL_GW_INFO]['validate']['type:dict_or_nodata'][

View File

@@ -14,7 +14,6 @@
# under the License.
import abc
import typing
from neutron_lib._i18n import _
from neutron_lib import constants
@@ -184,7 +183,7 @@ class APIExtensionDescriptor(ExtensionDescriptor):
If extension implementations need to override the default behavior of
this class they can override the respective method directly.
"""
api_definition: typing.Union[constants.Sentinel, object] = _UNSET
api_definition: constants.Sentinel | object = _UNSET
@classmethod
def _assert_api_definition(cls, attr=None):

View File

@@ -37,8 +37,8 @@ UNLIMITED = None
# Note: In order to ensure that the MAC address is unicast the first byte
# must be even.
MAC_PATTERN = "^{}[aceACE02468](:{}{{2}}){{5}}$".format(constants.HEX_ELEM,
constants.HEX_ELEM)
_HEX_ELEM = constants.HEX_ELEM
MAC_PATTERN = f"^{_HEX_ELEM}[aceACE02468](:{_HEX_ELEM}{{2}}){{5}}$"
def _verify_dict_keys(expected_keys, target_dict, strict=True):

View File

@@ -61,7 +61,7 @@ def _validate_dns_name_with_dns_domain(request_dns_name, dns_domain):
# legal size
higher_labels = dns_domain
if dns_domain:
higher_labels = '.%s' % dns_domain
higher_labels = f'.{dns_domain}'
higher_labels_len = len(higher_labels)
dns_name_len = len(request_dns_name)
if not request_dns_name.endswith('.'):
@@ -97,7 +97,7 @@ def _get_dns_domain_config():
return ''
if cfg.CONF.dns_domain.endswith('.'):
return cfg.CONF.dns_domain
return '%s.' % cfg.CONF.dns_domain
return f'{cfg.CONF.dns_domain}.'
def _get_request_dns_name(dns_name):

View File

@@ -56,8 +56,7 @@ class NotificationError:
self._cancellable = cancellable
def __str__(self):
return 'Callback {} failed with "{}"'.format(
self.callback_id, self.error)
return f'Callback {self.callback_id} failed with "{self.error}"'
@property
def is_cancellable(self):

View File

@@ -102,7 +102,7 @@ class CallbacksManager:
if not self._index[callback_id]:
del self._index[callback_id]
else:
value = '{},{}'.format(resource, event)
value = f'{resource},{event}'
raise exceptions.Invalid(element='resource,event', value=value)
def unsubscribe_by_resource(self, callback, resource):

View File

@@ -67,7 +67,7 @@ def receives(resource, events, priority=priority_group.PRIORITY_DEFAULT):
@has_registry_receivers decorator to setup the __new__ method to
actually register the instance methods after initialization.
"""
if not isinstance(events, (list, tuple, set)):
if not isinstance(events, list | tuple | set):
msg = _("'events' must be a collection (list, tuple, set)")
raise AssertionError(msg)

View File

@@ -421,6 +421,7 @@ TC_QDISC_PARENTS = {'root': 0xffffffff,
class Sentinel:
"""A constant object that does not change even when copied."""
def __deepcopy__(self, memo):
# Always return the same object because this is essentially a constant.
return self

View File

@@ -142,7 +142,7 @@ def _tag_retriables_as_unretriable(f):
def _copy_if_lds(item):
"""Deepcopy lists/dicts/sets, leave everything else alone."""
return copy.deepcopy(item) if isinstance(item, (list, dict, set)) else item
return copy.deepcopy(item) if isinstance(item, list | dict | set) else item
_retry_db_errors = oslo_db_api.wrap_db_retry(
@@ -338,8 +338,8 @@ def _load_one_to_manys(session):
if relationship_attr.key not in state.dict:
getattr(new_object, relationship_attr.key)
if relationship_attr.key not in state.dict:
msg = ("Relationship %s attributes must be loaded in db "
"object %s" % (relationship_attr.key, state.dict))
msg = (f"Relationship {relationship_attr.key} attributes "
f"must be loaded in db object {state.dict}")
raise AssertionError(msg)
@@ -448,7 +448,7 @@ def _listen_for_changes(cls, key, inst):
# can look up the parent. so here we make one if it doesn't
# have it already, as is the case in this example
if not prop.back_populates:
name = "_%s_backref" % prop.key
name = f"_{prop.key}_backref"
backref_prop = orm.relationship(
prop.parent, back_populates=prop.key)

View File

@@ -94,7 +94,7 @@ class _NeutronBase(models.ModelBase):
def __repr__(self):
"""sqlalchemy based automatic __repr__ method."""
items = ['{}={!r}'.format(col.name, getattr(self, col.name))
items = [f'{col.name}={getattr(self, col.name)!r}'
for col in self.__table__.columns]
return "<{}.{}[object at {:x}] {{{}}}>".format(
self.__class__.__module__, self.__class__.__name__,

View File

@@ -90,7 +90,7 @@ def register_hook(model, name, query_hook, filter_hook,
if callable(result_filters):
result_filters = helpers.make_weak_ref(result_filters)
if rbac_actions is not None:
if isinstance(rbac_actions, (set, list, tuple)):
if isinstance(rbac_actions, set | list | tuple):
rbac_actions = set(rbac_actions)
else:
rbac_actions = {rbac_actions}

View File

@@ -125,7 +125,7 @@ def get_marker_obj(plugin, context, resource, limit, marker):
are given.
"""
if limit and marker:
return getattr(plugin, '_get_%s' % resource)(context, marker)
return getattr(plugin, f'_get_{resource}')(context, marker)
def resource_fields(resource, fields):

View File

@@ -396,6 +396,7 @@ class SubnetMismatchForPort(BadRequest):
class Invalid(NeutronException):
"""A generic base class for invalid errors."""
def __init__(self, message=None):
self.message = message
super().__init__()

View File

@@ -71,7 +71,7 @@ def use_jsonutils(logical_line, filename):
if "json." in logical_line:
json_funcs = ['dumps(', 'dump(', 'loads(', 'load(']
for f in json_funcs:
pos = logical_line.find('json.%s' % f)
pos = logical_line.find(f'json.{f}')
if pos != -1:
yield (pos, msg % {'fun': f[:-1]})
@@ -85,25 +85,25 @@ def _check_imports(regex, submatch, logical_line):
def _check_namespace_imports(failure_code, namespace, new_ns, logical_line,
message_override=None):
if message_override is not None:
msg_o = "{}: {}".format(failure_code, message_override)
msg_o = f"{failure_code}: {message_override}"
else:
msg_o = None
if _check_imports(namespace_imports_from_dot, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
msg = ("{}: '{}' must be used instead of '{}'.").format(
failure_code,
logical_line.replace('%s.' % namespace, new_ns),
logical_line.replace(f'{namespace}.', new_ns),
logical_line)
return (0, msg_o or msg)
elif _check_imports(namespace_imports_from_root, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
msg = ("{}: '{}' must be used instead of '{}'.").format(
failure_code,
logical_line.replace(
'from %s import ' % namespace, 'import %s' % new_ns),
f'from {namespace} import ', f'import {new_ns}'),
logical_line)
return (0, msg_o or msg)
elif _check_imports(namespace_imports_dot, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
msg = ("{}: '{}' must be used instead of '{}'.").format(
failure_code,
logical_line.replace('import', 'from').replace('.', ' import '),
logical_line)

View File

@@ -55,6 +55,7 @@ class RangeConstrainedInteger(obj_fields.Integer):
class IPNetworkPrefixLen(RangeConstrainedInteger):
"""IP network (CIDR) prefix length custom Enum"""
def __init__(self, **kwargs):
super().__init__(start=0, end=lib_constants.IPv6_BITS, **kwargs)
@@ -205,6 +206,7 @@ class IntegerEnum(obj_fields.Integer):
class IPVersionEnum(IntegerEnum):
"""IP version integer Enum"""
def __init__(self, **kwargs):
super().__init__(
valid_values=lib_constants.IP_ALLOWED_VERSIONS, **kwargs)
@@ -243,6 +245,7 @@ class EtherTypeEnumField(obj_fields.AutoTypedField):
class IpProtocolEnum(obj_fields.Enum):
"""IP protocol number Enum"""
def __init__(self, **kwargs):
super().__init__(
valid_values=list(
@@ -269,6 +272,7 @@ class MACAddress(obj_fields.FieldType):
This custom field is different from the one provided by
oslo.versionedobjects library: it uses netaddr.EUI type instead of strings.
"""
def coerce(self, obj, attr, value):
if not isinstance(value, netaddr.EUI):
msg = _("Field value %s is not a netaddr.EUI") % value
@@ -340,6 +344,7 @@ class IPNetwork(obj_fields.FieldType):
oslo.versionedobjects library: it does not reset string representation for
the field.
"""
def coerce(self, obj, attr, value):
if not isinstance(value, netaddr.IPNetwork):
msg = _("Field value %s is not a netaddr.IPNetwork") % value

View File

@@ -127,19 +127,19 @@ class NoAuthClient:
raise ks_exc.HttpError
def get(self, url, endpoint_filter, **kwargs):
return self.request('{}{}'.format(self.url, url), 'GET', **kwargs)
return self.request(f'{self.url}{url}', 'GET', **kwargs)
def post(self, url, json, endpoint_filter, **kwargs):
return self.request('{}{}'.format(self.url, url), 'POST', body=json,
return self.request(f'{self.url}{url}', 'POST', body=json,
**kwargs)
def put(self, url, json, endpoint_filter, **kwargs):
resp = self.request('{}{}'.format(self.url, url), 'PUT', body=json,
resp = self.request(f'{self.url}{url}', 'PUT', body=json,
**kwargs)
return resp
def delete(self, url, endpoint_filter, **kwargs):
return self.request('{}{}'.format(self.url, url), 'DELETE', **kwargs)
return self.request(f'{self.url}{url}', 'DELETE', **kwargs)
class PlacementAPIClient:
@@ -277,7 +277,7 @@ class PlacementAPIClient:
:returns: The updated resource provider.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s' % resource_provider['uuid']
url = '/resource_providers/{}'.format(resource_provider['uuid'])
# update does not tolerate if the uuid is repeated in the body
update_body = resource_provider.copy()
update_body.pop('uuid')
@@ -314,7 +314,7 @@ class PlacementAPIClient:
:param resource_provider_uuid: UUID of the resource provider.
"""
url = '/resource_providers/%s' % resource_provider_uuid
url = f'/resource_providers/{resource_provider_uuid}'
self._delete(url)
@_check_placement_api_available
@@ -326,7 +326,7 @@ class PlacementAPIClient:
:returns: The Resource Provider matching the UUID.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s' % resource_provider_uuid
url = f'/resource_providers/{resource_provider_uuid}'
try:
return self._get(url).json()
except ks_exc.NotFound:
@@ -377,7 +377,7 @@ class PlacementAPIClient:
filters['in_tree'] = in_tree
if uuid:
filters['uuid'] = uuid
url = '{}?{}'.format(url, parse.urlencode(filters))
url = f'{url}?{parse.urlencode(filters)}'
return self._get(url).json()
@_check_placement_api_available
@@ -409,7 +409,7 @@ class PlacementAPIClient:
:returns: The updated set of inventory records.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s/inventories' % resource_provider_uuid
url = f'/resource_providers/{resource_provider_uuid}/inventories'
body = {
'resource_provider_generation': resource_provider_generation,
'inventories': inventories
@@ -432,8 +432,7 @@ class PlacementAPIClient:
is not found.
:returns: None.
"""
url = '/resource_providers/%s/inventories' % (
resource_provider_uuid)
url = f'/resource_providers/{resource_provider_uuid}/inventories'
try:
self._delete(url)
except ks_exc.NotFound as e:
@@ -454,8 +453,8 @@ class PlacementAPIClient:
:raises PlacementInventoryNotFound: No inventory of class.
:returns: None.
"""
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
url = (f'/resource_providers/{resource_provider_uuid}'
f'/inventories/{resource_class}')
try:
self._delete(url)
except ks_exc.NotFound as e:
@@ -481,8 +480,8 @@ class PlacementAPIClient:
for a resource provider.
:returns: The inventory of the resource class as a dict.
"""
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
url = (f'/resource_providers/{resource_provider_uuid}/'
f'inventories/{resource_class}')
try:
return self._get(url).json()
except ks_exc.NotFound as e:
@@ -515,8 +514,8 @@ class PlacementAPIClient:
server side.
:returns: The updated inventory of the resource class as a dict.
"""
url = '/resource_providers/{}/inventories/{}'.format(
resource_provider_uuid, resource_class)
url = (f'/resource_providers/{resource_provider_uuid}/'
f'inventories/{resource_class}')
body = inventory
try:
@@ -535,7 +534,7 @@ class PlacementAPIClient:
provider.
:returns: All aggregates associated with the resource provider.
"""
url = '/resource_providers/%s/aggregates' % resource_provider_uuid
url = f'/resource_providers/{resource_provider_uuid}/aggregates'
return self._put(url, aggregates).json()
@_check_placement_api_available
@@ -549,7 +548,7 @@ class PlacementAPIClient:
generation.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s/aggregates' % resource_provider_uuid
url = f'/resource_providers/{resource_provider_uuid}/aggregates'
try:
return self._get(url).json()
except ks_exc.NotFound:
@@ -571,7 +570,7 @@ class PlacementAPIClient:
:returns: Evaluates to True if the trait exists.
"""
# pylint: disable=raise-missing-from
url = '/traits/%s' % name
url = f'/traits/{name}'
try:
return self._get(url)
except ks_exc.NotFound:
@@ -584,7 +583,7 @@ class PlacementAPIClient:
:param name: name of the trait to create.
:returns: The Response object so you may access response headers.
"""
url = '/traits/%s' % (name)
url = f'/traits/{name}'
return self._put(url, None)
@_check_placement_api_available
@@ -596,7 +595,7 @@ class PlacementAPIClient:
:returns: None.
"""
# pylint: disable=raise-missing-from
url = '/traits/%s' % (name)
url = f'/traits/{name}'
try:
self._delete(url)
except ks_exc.NotFound:
@@ -629,7 +628,7 @@ class PlacementAPIClient:
resource provider generation.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s/traits' % (resource_provider_uuid)
url = f'/resource_providers/{resource_provider_uuid}/traits'
body = {
'resource_provider_generation': resource_provider_generation,
'traits': traits
@@ -657,7 +656,7 @@ class PlacementAPIClient:
with the resource provider generation.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s/traits' % (resource_provider_uuid)
url = f'/resource_providers/{resource_provider_uuid}/traits'
try:
return self._get(url).json()
except ks_exc.NotFound:
@@ -675,7 +674,7 @@ class PlacementAPIClient:
:returns: None.
"""
# pylint: disable=raise-missing-from
url = '/resource_providers/%s/traits' % (resource_provider_uuid)
url = f'/resource_providers/{resource_provider_uuid}/traits'
try:
self._delete(url)
except ks_exc.NotFound:
@@ -698,7 +697,7 @@ class PlacementAPIClient:
:returns: The name of resource class and its set of links.
"""
# pylint: disable=raise-missing-from
url = '/resource_classes/%s' % (name)
url = f'/resource_classes/{name}'
try:
return self._get(url).json()
except ks_exc.NotFound:
@@ -722,7 +721,7 @@ class PlacementAPIClient:
:param name: the name of the resource class to be updated or validated
:returns: None.
"""
url = '/resource_classes/%s' % name
url = f'/resource_classes/{name}'
self._put(url, None)
@_check_placement_api_available
@@ -735,7 +734,7 @@ class PlacementAPIClient:
:returns: None.
"""
# pylint: disable=raise-missing-from
url = '/resource_classes/%s' % (name)
url = f'/resource_classes/{name}'
try:
self._delete(url)
except ks_exc.NotFound:
@@ -749,7 +748,7 @@ class PlacementAPIClient:
owned by a VM, the VM uuid.
:returns: All allocation records for the consumer.
"""
url = '/allocations/%s' % consumer_uuid
url = f'/allocations/{consumer_uuid}'
return self._get(url).json()
def update_qos_allocation(self, consumer_uuid, alloc_diff):
@@ -804,5 +803,5 @@ class PlacementAPIClient:
:param allocations: Dict in the form described in placement API ref:
https://tinyurl.com/yxeuzn6l
"""
url = '/allocations/%s' % consumer_uuid
url = f'/allocations/{consumer_uuid}'
self._put(url, allocations)

View File

@@ -27,8 +27,8 @@ def physnet_trait(physnet):
:param physnet: The physnet name.
:returns: The trait name representing the physnet.
"""
return os_traits.normalize_name('{}{}'.format(
place_const.TRAIT_PREFIX_PHYSNET, physnet))
return os_traits.normalize_name(
f'{place_const.TRAIT_PREFIX_PHYSNET}{physnet}')
def vnic_type_trait(vnic_type):
@@ -37,8 +37,8 @@ def vnic_type_trait(vnic_type):
:param physnet: The vnic_type.
:returns: The trait name representing the vnic_type.
"""
return os_traits.normalize_name('{}{}'.format(
place_const.TRAIT_PREFIX_VNIC_TYPE, vnic_type))
return os_traits.normalize_name(
f'{place_const.TRAIT_PREFIX_VNIC_TYPE}{vnic_type}')
def six_uuid5(namespace, name):

View File

@@ -288,9 +288,9 @@ def get_interface_name(name, prefix='', max_len=constants.DEVICE_NAME_MAX_LEN):
namelen = max_len - len(prefix) - INTERFACE_HASH_LEN
hashed_name = hashlib.new('sha1', usedforsecurity=False)
hashed_name.update(encodeutils.to_utf8(name))
new_name = ('%(prefix)s%(truncated)s%(hash)s' %
{'prefix': prefix, 'truncated': name[0:namelen],
'hash': hashed_name.hexdigest()[0:INTERFACE_HASH_LEN]})
new_name = (
f'{prefix}{name[0:namelen]}'
f'{hashed_name.hexdigest()[0:INTERFACE_HASH_LEN]}')
LOG.info("The requested interface name %(requested_name)s exceeds the "
"%(limit)d character limitation. It was shortened to "
"%(new_name)s to fit.",

View File

@@ -139,8 +139,8 @@ class _BackingOffContextWrapper(_ContextWrapper):
# two methods with the same name in different namespaces should
# be tracked independently
if self._original_context.target.namespace:
scoped_method = '{}.{}'.format(
self._original_context.target.namespace, method)
scoped_method = (
f'{self._original_context.target.namespace}.{method}')
else:
scoped_method = method
# set the timeout from the global method timeout tracker for this
@@ -181,6 +181,7 @@ class BackingOffClient(oslo_messaging.RPCClient):
returned will track when call timeout exceptions occur and exponentially
increase the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super().prepare(*args, **kwargs)
# don't back off contexts that explicitly set a timeout
@@ -245,7 +246,7 @@ def get_notifier(service=None, host=None, publisher_id=None):
"None. This is deprecated since 2025.1 release and "
"will be removed in one of the future releases. "
"Please always pass the 'service' argument.")
publisher_id = "{}.{}".format(service, host or cfg.CONF.host)
publisher_id = f"{service}.{host or cfg.CONF.host}"
serializer = RequestContextSerializer()
return oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
@@ -254,6 +255,7 @@ def get_notifier(service=None, host=None, publisher_id=None):
class RequestContextSerializer(om_serializer.Serializer):
"""Convert RPC common context into Neutron Context."""
def __init__(self, base=None):
super().__init__()
self._base = base
@@ -294,6 +296,7 @@ class Service(os_service.Service):
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super().__init__()
self.host = host
@@ -328,7 +331,8 @@ class Service(os_service.Service):
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception: # nosec
except Exception as e:
LOG.debug("Ignored exception during stop: %s", str(e))
pass
super().stop()

View File

@@ -110,7 +110,7 @@ class BaseTestCase(testtools.TestCase):
if conf is None:
version_info = pbr.version.VersionInfo('neutron-lib')
cfg.CONF(args=args, project='neutron_lib',
version='%%(prog)s %s' % version_info.release_string())
version=f'%(prog)s {version_info.release_string()}')
else:
conf(args)
@@ -224,16 +224,18 @@ class BaseTestCase(testtools.TestCase):
be reported upon failure.
"""
if not isinstance(expected_subset, dict):
self.fail("expected_subset (%s) is not an instance of dict" %
type(expected_subset))
self.fail(
f"expected_subset ({type(expected_subset)}) is not "
"an instance of dict")
if not isinstance(actual_superset, dict):
self.fail("actual_superset (%s) is not an instance of dict" %
type(actual_superset))
self.fail(
f"actual_superset ({type(actual_superset)}) is not "
"an instance of dict")
for k, v in expected_subset.items():
self.assertIn(k, actual_superset)
self.assertEqual(v, actual_superset[k],
"Key %(key)s expected: %(exp)r, actual %(act)r" %
{'key': k, 'exp': v, 'act': actual_superset[k]})
f"Key {k} expected: {v!r}, actual "
f"{actual_superset[k]!r}")
def setup_config(self, args=None):
"""Tests that need a non-default config can override this method."""

View File

@@ -26,13 +26,14 @@ def _get_debugger(debugger_name):
try:
debugger = __import__(debugger_name)
except ImportError as exc:
raise ValueError("can't import %s module as a post mortem debugger" %
debugger_name) from exc
raise ValueError(
f"can't import {debugger_name} module as a "
"post mortem debugger") from exc
if 'post_mortem' in dir(debugger):
return debugger
else:
raise ValueError("%s is not a supported post mortem debugger" %
debugger_name)
raise ValueError(
f"{debugger_name} is not a supported post mortem debugger")
def _exception_handler(debugger, exc_info):

View File

@@ -13,7 +13,6 @@
import importlib.util
import os
import types
import typing
from neutron_lib.api import definitions
from neutron_lib.api.definitions import base
@@ -25,7 +24,7 @@ from neutron_lib.tests import _base as test_base
def assert_bool(tester, attribute, attribute_dict, keyword, value):
tester.assertIsInstance(
value, bool,
'{} must be a boolean for {}.'.format(keyword, attribute))
f'{keyword} must be a boolean for {attribute}.')
def assert_converter(tester, attribute, attribute_dict, keyword, value):
@@ -40,20 +39,19 @@ def assert_converter(tester, attribute, attribute_dict, keyword, value):
attribute_dict['convert_list_to'](attribute_dict['default'])
except KeyError:
if validators.is_attr_set(value) and not isinstance(
value, (str, list)):
tester.fail("Default value '%s' cannot be converted for "
"attribute %s." % (value, attribute))
value, str | list):
tester.fail(f"Default value '{value}' cannot be converted for "
f"attribute {attribute}.")
def assert_true(tester, attribute, attribute_dict, keyword, value):
tester.assertTrue(
value, '{} must be True for {}.'.format(keyword, attribute))
value, f'{keyword} must be True for {attribute}.')
def assert_validator(tester, attribute, attribute_dict, keyword, value):
tester.assertIn(list(value)[0], validators.validators,
'{} is not a known validator for {}.'.format(
value, attribute))
f'{value} is not a known validator for {attribute}.')
ASSERT_FUNCTIONS = {
@@ -76,7 +74,7 @@ ASSERT_FUNCTIONS = {
class DefinitionBaseTestCase(test_base.BaseTestCase):
extension_module: typing.Optional[types.ModuleType] = None
extension_module: types.ModuleType | None = None
extension_resources: tuple[str, ...] = ()
extension_subresources: tuple[str, ...] = ()
extension_attributes: tuple[str, ...] = ()

View File

@@ -32,38 +32,37 @@ class TestDnsValidators(base.BaseTestCase):
invalid_data = 'A' * 256
max_len = 255
expected = ("'%(data)s' not a valid PQDN or FQDN. Reason: "
"'%(data)s' exceeds the %(maxlen)s character FQDN "
"limit") % {'data': invalid_data, 'maxlen': max_len}
expected = (f"'{invalid_data}' not a valid PQDN or FQDN. Reason: "
f"'{invalid_data}' exceeds the {max_len} character FQDN "
"limit")
msg = dns.validate_dns_name(invalid_data, max_len)
self.assertEqual(expected, msg)
invalid_data = '.hostname'
expected = ("'%(data)s' not a valid PQDN or FQDN. Reason: "
"Encountered an empty component") % {'data': invalid_data}
expected = (f"'{invalid_data}' not a valid PQDN or FQDN. Reason: "
"Encountered an empty component")
msg = dns.validate_dns_name(invalid_data)
self.assertEqual(expected, msg)
invalid_data = 'hostname-'
expected = ("'%(data)s' not a valid PQDN or FQDN. Reason: "
"Name '%(data)s' must not start or end with a "
"hyphen") % {'data': invalid_data}
expected = (f"'{invalid_data}' not a valid PQDN or FQDN. Reason: "
f"Name '{invalid_data}' must not start or end with a "
"hyphen")
msg = dns.validate_dns_name(invalid_data)
self.assertEqual(expected, msg)
invalid_data = 'hostname@host'
expected = ("'%(data)s' not a valid PQDN or FQDN. Reason: "
"Name '%(data)s' must be 1-63 characters long, each of "
"which can only be alphanumeric or a "
"hyphen") % {'data': invalid_data}
expected = (f"'{invalid_data}' not a valid PQDN or FQDN. Reason: "
f"Name '{invalid_data}' must be 1-63 characters long, "
"each of which can only be alphanumeric or a "
"hyphen")
msg = dns.validate_dns_name(invalid_data)
self.assertEqual(expected, msg)
invalid_suffix = '1234'
invalid_data = 'hostname.' + invalid_suffix
expected = ("'%(data)s' not a valid PQDN or FQDN. Reason: "
"TLD '%(suffix)s' must not be all "
"numeric") % {'data': invalid_data,
'suffix': invalid_suffix}
expected = (f"'{invalid_data}' not a valid PQDN or FQDN. Reason: "
f"TLD '{invalid_suffix}' must not be all "
"numeric")
msg = dns.validate_dns_name(invalid_data)
self.assertEqual(expected, msg)
@@ -72,16 +71,13 @@ class TestDnsValidators(base.BaseTestCase):
CONF.dns_domain = invalid_domain
dns_name = 'hostname'
expected = ("The dns_name passed is a PQDN and its size is "
"'%(dns_name_len)s'. The dns_domain option in "
"neutron.conf is set to %(dns_domain)s, with a "
"length of '%(higher_labels_len)s'. When the two are "
f"'{len(dns_name)}'. The dns_domain option in "
f"neutron.conf is set to {invalid_domain}, with a "
f"length of '{len(invalid_domain) + 1}'. When the two are "
"concatenated to form a FQDN (with a '.' at the end), "
"the resulting length exceeds the maximum size "
"of '%(fqdn_max_len)s'"
) % {'dns_name_len': len(dns_name),
'dns_domain': invalid_domain,
'higher_labels_len': len(invalid_domain) + 1,
'fqdn_max_len': db_constants.FQDN_FIELD_SIZE}
f"of '{db_constants.FQDN_FIELD_SIZE}'"
)
msg = dns.validate_dns_name(dns_name)
self.assertEqual(expected, msg)
@@ -90,9 +86,9 @@ class TestDnsValidators(base.BaseTestCase):
CONF.dns_domain = dns_domain
expected = ("The dns_name passed is a FQDN. Its higher level labels "
"must be equal to the dns_domain option in neutron.conf, "
"that has been set to '%(dns_domain)s'. It must also "
f"that has been set to '{dns_domain}'. It must also "
"include one or more valid DNS labels to the left "
"of '%(dns_domain)s'") % {'dns_domain': dns_domain}
f"of '{dns_domain}'")
msg = dns.validate_dns_name(dns_name)
self.assertEqual(expected, msg)
@@ -106,24 +102,22 @@ class TestDnsValidators(base.BaseTestCase):
self.assertIsNone(msg)
invalid_data = 1234
expected = "'%s' is not a valid string" % invalid_data
expected = f"'{invalid_data}' is not a valid string"
msg = dns.validate_fip_dns_name(invalid_data)
self.assertEqual(expected, msg)
invalid_data = 'host.'
expected = ("'%s' is a FQDN. It should be a relative domain "
"name") % invalid_data
expected = (f"'{invalid_data}' is a FQDN. It should be a relative "
"domain name")
msg = dns.validate_fip_dns_name(invalid_data)
self.assertEqual(expected, msg)
length = 10
invalid_data = 'a' * length
max_len = 12
expected = ("'%(data)s' contains %(length)s characters. Adding a "
expected = (f"'{invalid_data}' contains {length} characters. Adding a "
"domain name will cause it to exceed the maximum length "
"of a FQDN of '%(max_len)s'") % {"data": invalid_data,
"length": length,
"max_len": max_len}
f"of a FQDN of '{max_len}'")
msg = dns.validate_fip_dns_name(invalid_data, max_len)
self.assertEqual(expected, msg)
@@ -137,22 +131,20 @@ class TestDnsValidators(base.BaseTestCase):
self.assertIsNone(msg)
invalid_data = 1234
expected = "'%s' is not a valid string" % invalid_data
expected = f"'{invalid_data}' is not a valid string"
msg = dns.validate_dns_domain(invalid_data)
self.assertEqual(expected, msg)
invalid_data = 'example.com'
expected = "'%s' is not a FQDN" % invalid_data
expected = f"'{invalid_data}' is not a FQDN"
msg = dns.validate_dns_domain(invalid_data)
self.assertEqual(expected, msg)
length = 9
invalid_data = 'a' * length + '.'
max_len = 11
expected = ("'%(data)s' contains %(length)s characters. Adding a "
"sub-domain will cause it to exceed the maximum length "
"of a FQDN of '%(max_len)s'") % {"data": invalid_data,
"length": length + 1,
"max_len": max_len}
expected = (f"'{invalid_data}' contains {length + 1} characters. "
"Adding a sub-domain will cause it to exceed the maximum "
f"length of a FQDN of '{max_len}'")
msg = dns.validate_dns_domain(invalid_data, max_len)
self.assertEqual(expected, msg)

View File

@@ -149,9 +149,9 @@ class TestAttributeValidation(base.BaseTestCase):
# Check that value is not comparable to valid_values and got Exception
data = 1
valid_values = '[2, 3, 4, 5]'
response = "'data' of type '%s' and 'valid_values' of type" \
" '%s' are not compatible for comparison" % (
type(data), type(valid_values))
response = (f"'data' of type '{type(data)}' and 'valid_values' of "
f"type '{type(valid_values)}' are not compatible for "
"comparison")
self.assertRaisesRegex(TypeError, response,
validators.validate_values, data,
valid_values,
@@ -231,19 +231,19 @@ class TestAttributeValidation(base.BaseTestCase):
max_len = 4
msg = validators.validate_oneline_not_empty_string(data, max_len)
self.assertEqual(
"'{}' exceeds maximum length of {}".format(data, max_len),
f"'{data}' exceeds maximum length of {max_len}",
msg)
data = "First line\nsecond line"
msg = validators.validate_oneline_not_empty_string(data, None)
self.assertEqual(
"Multi-line string is not allowed: '%s'" % data,
f"Multi-line string is not allowed: '{data}'",
msg)
data = ""
msg = validators.validate_oneline_not_empty_string(data, None)
self.assertEqual(
"'%s' Blank strings are not permitted" % data,
f"'{data}' Blank strings are not permitted",
msg)
def test_validate_oneline_not_empty_string_or_none(self):
@@ -260,19 +260,19 @@ class TestAttributeValidation(base.BaseTestCase):
msg = validators.validate_oneline_not_empty_string_or_none(
data, max_len)
self.assertEqual(
"'{}' exceeds maximum length of {}".format(data, max_len),
f"'{data}' exceeds maximum length of {max_len}",
msg)
data = "First line\nsecond line"
msg = validators.validate_oneline_not_empty_string_or_none(data, None)
self.assertEqual(
"Multi-line string is not allowed: '%s'" % data,
f"Multi-line string is not allowed: '{data}'",
msg)
data = ""
msg = validators.validate_oneline_not_empty_string(data, None)
self.assertEqual(
"'%s' Blank strings are not permitted" % data,
f"'{data}' Blank strings are not permitted",
msg)
def test_validate_boolean(self):
@@ -325,10 +325,10 @@ class TestAttributeValidation(base.BaseTestCase):
for ws in string.whitespace:
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'%swhitespace-at-head' % ws)
f'{ws}whitespace-at-head')
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'whitespace-at-tail%s' % ws)
f'whitespace-at-tail{ws}')
def test_validate_range(self):
msg = validators.validate_range(1, [1, 9])
@@ -426,35 +426,35 @@ class TestAttributeValidation(base.BaseTestCase):
ip_addr = '1111.1.1.1'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
# Depending on platform to run UTs, this case might or might not be
# an equivalent to test_validate_ip_address_bsd.
ip_addr = '1' * 59
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
ip_addr = '1.1.1.1 has whitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
ip_addr = '111.1.1.1\twhitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
ip_addr = '111.1.1.1\nwhitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
for ws in string.whitespace:
ip_addr = '%s111.1.1.1' % ws
ip_addr = f'{ws}111.1.1.1'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
for ws in string.whitespace:
ip_addr = '111.1.1.1%s' % ws
ip_addr = f'111.1.1.1{ws}'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
def test_validate_ip_address_with_leading_zero(self):
ip_addr = '1.1.1.01'
@@ -489,7 +489,7 @@ class TestAttributeValidation(base.BaseTestCase):
msg = validators.validate_ip_address(ip_addr)
ip_address_cls.assert_called_once_with(ip_addr,
flags=netaddr.core.ZEROFILL)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
def test_validate_ip_pools(self):
pools = [[{'end': '10.0.0.254'}],
@@ -517,7 +517,7 @@ class TestAttributeValidation(base.BaseTestCase):
pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]]
for pool in pools:
msg = validators.validate_ip_pools(pool)
self.assertEqual("'%s' is not a valid IP address" % invalid_ip,
self.assertEqual(f"'{invalid_ip}' is not a valid IP address",
msg)
def test_validate_fixed_ips(self):
@@ -622,7 +622,7 @@ class TestAttributeValidation(base.BaseTestCase):
ip_addr = '1111.1.1.1'
msg = validators.validate_ip_address_or_none(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
self.assertEqual(f"'{ip_addr}' is not a valid IP address", msg)
def test_uuid_pattern(self):
data = 'garbage'
@@ -718,37 +718,37 @@ class TestAttributeValidation(base.BaseTestCase):
# Invalid - abbreviated ipv4 address
cidr = "10/24"
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
# Invalid - IPv4 missing mask
cidr = "10.0.2.0"
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
# Valid - IPv4 with non-zero masked bits is ok
for i in range(1, 255):
cidr = "192.168.1.%s/24" % i
cidr = f"192.168.1.{i}/24"
msg = validator(cidr, None)
self.assertIsNone(msg)
# Invalid - IPv6 without final octets, missing mask
cidr = "fe80::"
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
# Invalid - Address format error
cidr = 'invalid'
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
cidr = None
@@ -756,13 +756,13 @@ class TestAttributeValidation(base.BaseTestCase):
if allow_none:
self.assertIsNone(msg)
else:
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
# Invalid - IPv4 with trailing CR
cidr = "10.0.2.0/24\r"
msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
error = f"'{cidr}' is not a valid IP subnet"
self.assertEqual(error, msg)
def test_validate_subnet(self):
@@ -782,7 +782,7 @@ class TestAttributeValidation(base.BaseTestCase):
# Invalid - CIDR
cidr = "192.168.1.1/8"
msg = validators.validate_route_cidr(cidr, None)
error = "'%s' is not a valid CIDR" % cidr
error = f"'{cidr}' is not a valid CIDR"
self.assertEqual(error, msg)
# Invalid - loopback CIDR
@@ -794,7 +794,7 @@ class TestAttributeValidation(base.BaseTestCase):
# Invalid - CIDR format error
cidr = 'invalid'
msg = validators.validate_route_cidr(cidr, None)
error = "'%s' is not a valid CIDR" % cidr
error = f"'{cidr}' is not a valid CIDR"
self.assertEqual(error, msg)
def test_validate_subnet_or_none(self):
@@ -811,7 +811,7 @@ class TestAttributeValidation(base.BaseTestCase):
cidrs = ['10.1.0.0/24', '10.2.0.0']
msg = validators.validate_subnet_list(cidrs)
error = "'%s' is not a valid IP subnet" % cidrs[1]
error = f"'{cidrs[1]}' is not a valid IP subnet"
self.assertEqual(error, msg)
def _test_validate_regex(self, validator, allow_none=False):
@@ -826,7 +826,7 @@ class TestAttributeValidation(base.BaseTestCase):
data = 'bat'
msg = validator(data, pattern)
self.assertEqual("'%s' is not a valid input" % data, msg)
self.assertEqual(f"'{data}' is not a valid input", msg)
data = 'hat'
msg = validator(data, pattern)
@@ -881,7 +881,7 @@ class TestAttributeValidation(base.BaseTestCase):
'e5069610-744bb-42a7-8bd8-ceac1a229cd4']
for uuid in invalid_uuids:
msg = validators.validate_uuid(uuid)
error = "'%s' is not a valid UUID" % uuid
error = f"'{uuid}' is not a valid UUID"
self.assertEqual(error, msg)
msg = validators.validate_uuid('00000000-ffff-ffff-ffff-000000000000')
@@ -893,7 +893,7 @@ class TestAttributeValidation(base.BaseTestCase):
'123']
msg = validators.validate_uuid_list(bad_uuid_list,
valid_values='parameter not used')
error = "'%s' is not a valid UUID" % bad_uuid_list[2]
error = f"'{bad_uuid_list[2]}' is not a valid UUID"
self.assertEqual(error, msg)
good_uuid_list = ['00000000-ffff-ffff-ffff-000000000000',
@@ -911,7 +911,7 @@ class TestAttributeValidation(base.BaseTestCase):
{'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}]
for item in items:
msg = validators._validate_list_of_items(mock.Mock(), item)
error = "'%s' is not a list" % item
error = f"'{item}' is not a list"
self.assertEqual(error, msg)
# check duplicate items in a list
@@ -935,7 +935,7 @@ class TestAttributeValidation(base.BaseTestCase):
def test__test__validate_list_of_items_non_empty(self):
items = None
msg = validators._validate_list_of_items_non_empty(mock.Mock(), items)
error = "'%s' is not a list" % items
error = f"'{items}' is not a list"
self.assertEqual(error, msg)
items = []
@@ -946,23 +946,23 @@ class TestAttributeValidation(base.BaseTestCase):
items = ['a', 'b', 'duplicate_1', 'duplicate_2', 'duplicate_1',
'duplicate_2', 'duplicate_2', 'c']
msg = validators._validate_list_of_items(mock.Mock(), items)
error = ("Duplicate items in the list: '%s'"
% 'duplicate_1, duplicate_2')
error = ("Duplicate items in the list: '{}'".format(
'duplicate_1, duplicate_2'))
self.assertEqual(error, msg)
items = [['a', 'b'], ['c', 'd'], ['a', 'b']]
msg = validators._validate_list_of_items(mock.Mock(), items)
error = "Duplicate items in the list: '%s'" % str(['a', 'b'])
error = "Duplicate items in the list: '{}'".format(str(['a', 'b']))
self.assertEqual(error, msg)
items = [{'a': 'b'}, {'c': 'd'}, {'a': 'b'}]
msg = validators._validate_list_of_items(mock.Mock(), items)
error = "Duplicate items in the list: '%s'" % str({'a': 'b'})
error = "Duplicate items in the list: '{}'".format(str({'a': 'b'}))
self.assertEqual(error, msg)
def test_validate_dict_type(self):
for value in (None, True, '1', []):
self.assertEqual("'%s' is not a dictionary" % value,
self.assertEqual(f"'{value}' is not a dictionary",
validators.validate_dict(value))
def test_validate_dict_without_constraints(self):
@@ -1077,7 +1077,7 @@ class TestAttributeValidation(base.BaseTestCase):
self.assertEqual("'abc' is not an integer", msg)
for value in (-1, '-2'):
self.assertEqual("'%s' should be non-negative" % value,
self.assertEqual(f"'{value}' should be non-negative",
validators.validate_non_negative(value))
for value in (0, 1, '2', True, False):

View File

@@ -95,8 +95,8 @@ class CallBacksManagerTestCase(base.BaseTestCase):
self.assertIsNotNone(
self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])
self.assertIn(callback_id_1, self.manager._index)
self.assertEqual(self.__module__ + '.callback_1-%s' %
hash(callback_1), callback_id_1)
self.assertEqual(self.__module__ +
f'.callback_1-{hash(callback_1)}', callback_id_1)
self.assertEqual(cancellable,
self.manager._callbacks[resources.PORT]
[events.BEFORE_CREATE][0][2])

View File

@@ -172,26 +172,26 @@ class TestDeadLockDecorator(_base.BaseTestCase):
context.session.is_active = False
list_arg = [1, 2, 3, 4]
dict_arg = {1: 'a', 2: 'b'}
l, d = self._context_function(context, list_arg, dict_arg,
5, db_exc.DBDeadlock())
la, da = self._context_function(context, list_arg, dict_arg,
5, db_exc.DBDeadlock())
# even though we had 5 failures the list and dict should only
# be mutated once
self.assertEqual(5, len(l))
self.assertEqual(3, len(d))
self.assertEqual(5, len(la))
self.assertEqual(3, len(da))
def test_retry_if_session_inactive_kwargs_not_mutated_after_retries(self):
context = mock.Mock()
context.session.is_active = False
list_arg = [1, 2, 3, 4]
dict_arg = {1: 'a', 2: 'b'}
l, d = self._context_function(context, list_arg=list_arg,
dict_arg=dict_arg,
fail_count=5,
exc_to_raise=db_exc.DBDeadlock())
la, da = self._context_function(context, list_arg=list_arg,
dict_arg=dict_arg,
fail_count=5,
exc_to_raise=db_exc.DBDeadlock())
# even though we had 5 failures the list and dict should only
# be mutated once
self.assertEqual(5, len(l))
self.assertEqual(3, len(d))
self.assertEqual(5, len(la))
self.assertEqual(3, len(da))
def test_retry_if_session_inactive_no_retry_in_active_session(self):
context = mock.Mock()

View File

@@ -12,6 +12,7 @@
from unittest import mock
from neutron_lib._i18n import _
import neutron_lib.exceptions.l3_ext_ha_mode as lehm
from neutron_lib.tests.unit.exceptions import test_exceptions

View File

@@ -42,30 +42,30 @@ class HackingTestCase(base.BaseTestCase):
def test_use_jsonutils(self):
def __get_msg(fun):
msg = ("N521: jsonutils.%(fun)s must be used instead of "
"json.%(fun)s" % {'fun': fun})
msg = (f"N521: jsonutils.{fun} must be used instead of "
f"json.{fun}")
return [(0, msg)]
for method in ('dump', 'dumps', 'load', 'loads'):
self.assertEqual(
__get_msg(method),
list(checks.use_jsonutils("json.%s(" % method,
list(checks.use_jsonutils(f"json.{method}(",
"./neutron/common/rpc.py")))
self.assertEqual(
0,
len(list(checks.use_jsonutils("jsonx.%s(" % method,
len(list(checks.use_jsonutils(f"jsonx.{method}(",
"./neutron/common/rpc.py"))))
self.assertEqual(
0,
len(list(checks.use_jsonutils("json.%sx(" % method,
len(list(checks.use_jsonutils(f"json.{method}x(",
"./neutron/common/rpc.py"))))
self.assertEqual(
0,
len(list(checks.use_jsonutils(
"json.%s" % method,
f"json.{method}",
"./neutron/plugins/ml2/drivers/openvswitch/agent/xenapi/"
"etc/xapi.d/plugins/netwrap"))))
@@ -100,15 +100,15 @@ class HackingTestCase(base.BaseTestCase):
def test_no_log_translations(self):
for log in tc._all_log_levels:
for hint in tc._all_hints:
bad = 'LOG.{}({}("Bad"))'.format(log, hint)
bad = f'LOG.{log}({hint}("Bad"))'
self.assertEqual(
1, len(list(tc.no_translate_logs(bad, 'f'))))
# Catch abuses when used with a variable and not a literal
bad = 'LOG.{}({}(msg))'.format(log, hint)
bad = f'LOG.{log}({hint}(msg))'
self.assertEqual(
1, len(list(tc.no_translate_logs(bad, 'f'))))
# Do not do validations in tests
ok = 'LOG.%s(_("OK - unit tests"))' % log
ok = f'LOG.{log}(_("OK - unit tests"))'
self.assertEqual(
0, len(list(tc.no_translate_logs(ok, 'f/tests/f'))))

View File

@@ -74,7 +74,7 @@ class IPV6ModeEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
@@ -89,7 +89,7 @@ class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
self.assertEqual(f"{in_val}", self.field.stringify(in_val))
class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
@@ -103,7 +103,7 @@ class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
self.assertEqual(f"{in_val}", self.field.stringify(in_val))
class MACAddressFieldTest(test_base.BaseTestCase, TestField):
@@ -126,7 +126,7 @@ class MACAddressFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
self.assertEqual(f'{in_val}', self.field.stringify(in_val))
class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
@@ -151,7 +151,7 @@ class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
self.assertEqual(f'{in_val}', self.field.stringify(in_val))
class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -166,7 +166,7 @@ class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
self.assertEqual(f"{in_val}", self.field.stringify(in_val))
class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -181,7 +181,7 @@ class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class FlowDirectionAndAnyEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -196,7 +196,7 @@ class FlowDirectionAndAnyEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class PortRangesFieldTest(test_base.BaseTestCase, TestField):
@@ -226,7 +226,7 @@ class DomainNameFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -241,7 +241,7 @@ class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -261,7 +261,7 @@ class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class UUIDFieldTest(test_base.BaseTestCase, TestField):
@@ -281,7 +281,7 @@ class UUIDFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
self.assertEqual(f'{in_val}', self.field.stringify(in_val))
class DictOfMiscValuesFieldTest(test_base.BaseTestCase, TestField):
@@ -328,7 +328,7 @@ class NetworkSegmentRangeNetworkTypeEnumFieldTest(test_base.BaseTestCase,
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class NumaAffinityPoliciesEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -346,7 +346,7 @@ class NumaAffinityPoliciesEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))
class PortHardwareOffloadTypeEnumFieldTest(test_base.BaseTestCase, TestField):
@@ -361,4 +361,4 @@ class PortHardwareOffloadTypeEnumFieldTest(test_base.BaseTestCase, TestField):
def test_stringify(self):
for in_val, out_val in self.coerce_good_values:
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
self.assertEqual(f"'{in_val}'", self.field.stringify(in_val))

View File

@@ -64,17 +64,17 @@ class TestNoAuthClient(base.BaseTestCase):
@mock.patch.object(place_client.NoAuthClient, 'request')
def test_put(self, mock_request):
self.noauth_client.put('resource_providers/%s' % self.uuid,
self.noauth_client.put(f'resource_providers/{self.uuid}',
self.body_json, '')
mock_request.assert_called_with(
'placement/resource_providers/%s' % self.uuid, 'PUT',
f'placement/resource_providers/{self.uuid}', 'PUT',
body=self.body_json)
@mock.patch.object(place_client.NoAuthClient, 'request')
def test_delete(self, mock_request):
self.noauth_client.delete('resource_providers/%s' % self.uuid, '')
self.noauth_client.delete(f'resource_providers/{self.uuid}', '')
mock_request.assert_called_with(
'placement/resource_providers/%s' % self.uuid, 'DELETE')
f'placement/resource_providers/{self.uuid}', 'DELETE')
class TestPlacementAPIClientNoAuth(base.BaseTestCase):
@@ -127,7 +127,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.update_resource_provider(
RESOURCE_PROVIDER)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%s' % RESOURCE_PROVIDER_UUID,
f'/resource_providers/{RESOURCE_PROVIDER_UUID}',
{'name': RESOURCE_PROVIDER_NAME}
)
@@ -135,7 +135,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.ensure_resource_provider(
RESOURCE_PROVIDER)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%s' % RESOURCE_PROVIDER_UUID,
f'/resource_providers/{RESOURCE_PROVIDER_UUID}',
{'name': RESOURCE_PROVIDER_NAME}
)
self.placement_fixture.mock_post.assert_not_called()
@@ -155,12 +155,12 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.delete_resource_provider(
RESOURCE_PROVIDER_UUID)
self.placement_fixture.mock_delete.assert_called_once_with(
'/resource_providers/%s' % RESOURCE_PROVIDER_UUID)
f'/resource_providers/{RESOURCE_PROVIDER_UUID}')
def test_get_resource_provider(self):
self.placement_api_client.get_resource_provider(RESOURCE_PROVIDER_UUID)
self.placement_fixture.mock_get.assert_called_once_with(
'/resource_providers/%s' % RESOURCE_PROVIDER_UUID)
f'/resource_providers/{RESOURCE_PROVIDER_UUID}')
def test_get_resource_provider_no_resource_provider(self):
self.placement_fixture.mock_get.side_effect = ks_exc.NotFound()
@@ -213,7 +213,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.update_resource_provider_inventories(
RESOURCE_PROVIDER_UUID, INVENTORY, RESOURCE_PROVIDER_GENERATION)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%s/inventories' % RESOURCE_PROVIDER_UUID,
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories',
expected_body)
def test_update_resource_provider_inventories_no_generation(self):
@@ -227,7 +227,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.update_resource_provider_inventories(
RESOURCE_PROVIDER_UUID, INVENTORY)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%s/inventories' % RESOURCE_PROVIDER_UUID,
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories',
expected_body)
def test_update_resource_provider_inventories_no_rp(self):
@@ -243,9 +243,8 @@ class TestPlacementAPIClient(base.BaseTestCase):
RESOURCE_PROVIDER_UUID, RESOURCE_CLASS_NAME
)
self.placement_fixture.mock_delete.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/inventories/%(rc_name)s' %
{'rp_uuid': RESOURCE_PROVIDER_UUID,
'rc_name': RESOURCE_CLASS_NAME}
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories/'
f'{RESOURCE_CLASS_NAME}'
)
def test_delete_resource_provider_inventory_no_rp(self):
@@ -263,9 +262,8 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.get_inventory(RESOURCE_PROVIDER_UUID,
RESOURCE_CLASS_NAME)
self.placement_fixture.mock_get.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/inventories/%(rc_name)s' %
{'rp_uuid': RESOURCE_PROVIDER_UUID,
'rc_name': RESOURCE_CLASS_NAME})
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories/'
f'{RESOURCE_CLASS_NAME}')
def test_get_inventory_no_resource_provider(self):
_exception = ks_exc.NotFound()
@@ -301,9 +299,8 @@ class TestPlacementAPIClient(base.BaseTestCase):
RESOURCE_PROVIDER_UUID, INVENTORY, RESOURCE_CLASS_NAME,
resource_provider_generation=1)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/inventories/%(rc_name)s' %
{'rp_uuid': RESOURCE_PROVIDER_UUID,
'rc_name': RESOURCE_CLASS_NAME},
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories/'
f'{RESOURCE_CLASS_NAME}',
expected_body)
def test_update_resource_provider_inventory_no_generation(self):
@@ -317,9 +314,8 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.update_resource_provider_inventory(
RESOURCE_PROVIDER_UUID, INVENTORY, RESOURCE_CLASS_NAME)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/inventories/%(rc_name)s' %
{'rp_uuid': RESOURCE_PROVIDER_UUID,
'rc_name': RESOURCE_CLASS_NAME},
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/inventories/'
f'{RESOURCE_CLASS_NAME}',
expected_body)
def test_update_resource_provider_inventory_not_found(self):
@@ -336,13 +332,13 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.associate_aggregates(RESOURCE_PROVIDER_UUID,
mock.ANY)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%s/aggregates' % RESOURCE_PROVIDER_UUID,
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/aggregates',
mock.ANY)
def test_list_aggregates(self):
self.placement_api_client.list_aggregates(RESOURCE_PROVIDER_UUID)
self.placement_fixture.mock_get.assert_called_once_with(
'/resource_providers/%s/aggregates' % RESOURCE_PROVIDER_UUID)
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/aggregates')
def test_list_aggregates_no_resource_provider(self):
self.placement_fixture.mock_get.side_effect = ks_exc.NotFound()
@@ -358,7 +354,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_get_trait(self):
self.placement_api_client.get_trait(TRAIT_NAME)
self.placement_fixture.mock_get.assert_called_once_with(
'/traits/%s' % TRAIT_NAME)
f'/traits/{TRAIT_NAME}')
def test_get_trait_no_trait(self):
self.placement_fixture.mock_get.side_effect = ks_exc.NotFound()
@@ -370,7 +366,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_create_trait(self):
self.placement_api_client.update_trait(TRAIT_NAME)
self.placement_fixture.mock_put.assert_called_once_with(
'/traits/%s' % TRAIT_NAME, None)
f'/traits/{TRAIT_NAME}', None)
def test_update_resource_provider_traits_generation(self):
traits = [TRAIT_NAME]
@@ -378,8 +374,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
RESOURCE_PROVIDER_UUID, traits,
resource_provider_generation=RESOURCE_PROVIDER_GENERATION)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/traits' %
{'rp_uuid': RESOURCE_PROVIDER_UUID},
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/traits',
{'resource_provider_generation': RESOURCE_PROVIDER_GENERATION,
'traits': traits})
@@ -391,8 +386,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.update_resource_provider_traits(
RESOURCE_PROVIDER_UUID, traits)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_providers/%(rp_uuid)s/traits' %
{'rp_uuid': RESOURCE_PROVIDER_UUID},
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/traits',
{'resource_provider_generation': RESOURCE_PROVIDER_GENERATION,
'traits': traits})
@@ -416,7 +410,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.list_resource_provider_traits(
RESOURCE_PROVIDER_UUID)
self.placement_fixture.mock_get.assert_called_once_with(
'/resource_providers/%s/traits' % RESOURCE_PROVIDER_UUID)
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/traits')
def test_list_resource_provider_traits_no_rp(self):
self.placement_fixture.mock_get.side_effect = ks_exc.NotFound()
@@ -428,7 +422,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_delete_trait(self):
self.placement_api_client.delete_trait(TRAIT_NAME)
self.placement_fixture.mock_delete.assert_called_once_with(
'/traits/%s' % TRAIT_NAME)
f'/traits/{TRAIT_NAME}')
def test_delete_trait_no_trait(self):
self.placement_fixture.mock_delete.side_effect = ks_exc.NotFound()
@@ -441,7 +435,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
self.placement_api_client.delete_resource_provider_traits(
RESOURCE_PROVIDER_UUID)
self.placement_fixture.mock_delete.assert_called_once_with(
'/resource_providers/%s/traits' % RESOURCE_PROVIDER_UUID)
f'/resource_providers/{RESOURCE_PROVIDER_UUID}/traits')
def test_delete_resource_provider_traits_no_rp(self):
self.placement_fixture.mock_delete.side_effect = ks_exc.NotFound()
@@ -459,7 +453,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_get_resource_class(self):
self.placement_api_client.get_resource_class(RESOURCE_CLASS_NAME)
self.placement_fixture.mock_get.assert_called_once_with(
'/resource_classes/%s' % RESOURCE_CLASS_NAME
f'/resource_classes/{RESOURCE_CLASS_NAME}'
)
def test_get_resource_class_no_resource_class(self):
@@ -480,12 +474,12 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_update_resource_class(self):
self.placement_api_client.update_resource_class(RESOURCE_CLASS_NAME)
self.placement_fixture.mock_put.assert_called_once_with(
'/resource_classes/%s' % RESOURCE_CLASS_NAME, None)
f'/resource_classes/{RESOURCE_CLASS_NAME}', None)
def test_delete_resource_class(self):
self.placement_api_client.delete_resource_class(RESOURCE_CLASS_NAME)
self.placement_fixture.mock_delete.assert_called_once_with(
'/resource_classes/%s' % RESOURCE_CLASS_NAME
f'/resource_classes/{RESOURCE_CLASS_NAME}'
)
def test_delete_resource_class_no_resource_class(self):
@@ -704,7 +698,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
def test_list_allocations(self):
self.placement_api_client.list_allocations(CONSUMER_UUID)
self.placement_fixture.mock_get.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID)
f'/allocations/{CONSUMER_UUID}')
def test_update_allocation(self):
mock_rsp = mock.Mock()
@@ -721,7 +715,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
'resources': {'a': 20}}
}})
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {
RESOURCE_PROVIDER_UUID: {
'resources': {'a': 20}}
@@ -744,7 +738,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
alloc_diff={RESOURCE_PROVIDER_UUID: {'a': 2, 'b': 2}},
)
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {
RESOURCE_PROVIDER_UUID: {
'resources': {'a': 5, 'b': 4}}
@@ -834,7 +828,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
alloc_diff={RESOURCE_PROVIDER_UUID: {'a': -3, 'b': -2}},
)
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {}})
def test_update_qos_allocation_one_class_to_zero(self):
@@ -846,7 +840,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
alloc_diff={RESOURCE_PROVIDER_UUID: {'a': -3, 'b': 1}},
)
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {
RESOURCE_PROVIDER_UUID: {
'resources': {'b': 3}}
@@ -861,7 +855,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
alloc_diff={RESOURCE_PROVIDER_UUID: {'a': -3, 'b': 1}},
)
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {
RESOURCE_PROVIDER_UUID: {
'resources': {'b': 1}}
@@ -881,7 +875,7 @@ class TestPlacementAPIClient(base.BaseTestCase):
},
)
self.placement_fixture.mock_put.assert_called_once_with(
'/allocations/%s' % CONSUMER_UUID,
f'/allocations/{CONSUMER_UUID}',
{'allocations': {
RESOURCE_PROVIDER_UUID: {
'resources': {'b': 4}},

View File

@@ -97,7 +97,7 @@ def dict2str(dic):
:returns: The dict in str representation that is a k=v command list for
each item in dic.
"""
return ','.join("{}={}".format(key, val)
return ','.join(f"{key}={val}"
for key, val in sorted(dic.items()))
@@ -209,8 +209,7 @@ def resolve_ref(ref):
def timecost(f):
call_id = uuidutils.generate_uuid()
message_base = ("Time-cost: call %(call_id)s function %(fname)s ") % {
"call_id": call_id, "fname": f.__name__}
message_base = (f"Time-cost: call {call_id} function {f.__name__} ")
end_message = (message_base + "took %(seconds).3fs seconds to run")
@timeutils.time_it(LOG, message=end_message, min_duration=None)

View File

@@ -42,7 +42,7 @@ def get_random_mac(base_mac):
random.getrandbits(8), random.getrandbits(8)]
if base_mac[3] != '00':
mac[3] = int(base_mac[3], 16)
return ':'.join(["%02x" % x for x in mac])
return ':'.join([f"{x:02x}" for x in mac])
def random_mac_generator(base_mac):

View File

@@ -27,8 +27,8 @@ def unstable_test(reason):
try:
return f(self, *args, **kwargs)
except Exception as e:
msg = ("%s was marked as unstable because of %s, "
"failure was: %s") % (self.id(), reason, e)
msg = (f"{self.id()} was marked as unstable because of "
f"{reason}, failure was: {e}")
raise self.skipTest(msg)
return inner
return decor

View File

@@ -93,9 +93,9 @@ class BaseWorker(service.ServiceBase):
if not desc:
desc = self.__class__.__name__
proctitle = "{}: {}".format(name, desc)
proctitle = f"{name}: {desc}"
if self._set_proctitle == "on":
proctitle += " (%s)" % self._parent_proctitle
proctitle += f" ({self._parent_proctitle})"
setproctitle.setproctitle(proctitle)

View File

@@ -13,3 +13,17 @@ disable_error_code = "import-untyped,var-annotated,import-not-found"
# honor excludes by not following there through imports
follow_imports = "silent"
files = "neutron_lib"
[tool.ruff]
line-length = 79
[tool.ruff.lint]
select = ["E4", "E5", "E7", "E9", "F", "S", "UP"]
ignore = [
"S104", # Possible binding to all interfaces
"S311", # Suspicious non-cryptographic random usage
"UP031", # Use format specifiers instead of percent format
]
[tool.ruff.lint.per-file-ignores]
"neutron_lib/tests/*" = ["S"]