pyupgrade changes for Python3.10+

As discussed at the Flamingo PTG meeting, run an automated
upgrade tool to make code python 3.10+ compliant.

Result of running:

$ pyupgrade --py310-plus $(git ls-files | grep ".py$")

Fixed PEP8 errors introduced by pyupgrade by running:

$ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
--in-place neutron

Also did manual updates as necessary to fix other errors
and warnings after above commands.

Bumped versions of checkers - pylint, bandit and mypy to
more recent versions, which required disabling a new
warning, too-many-positional-arguments.

Change-Id: Ic6908af2c331e3ea6c50f1a8a8e261db41572645
This commit is contained in:
Brian Haley
2025-04-09 20:03:21 -04:00
committed by Stephen Finucane
parent e191ee0a71
commit 27a98b5c62
124 changed files with 260 additions and 269 deletions

View File

@ -74,6 +74,7 @@ disable=
too-many-lines,
too-many-locals,
too-many-nested-blocks,
too-many-positional-arguments,
too-many-public-methods,
too-many-return-statements,
too-many-statements,

View File

@ -1421,7 +1421,7 @@ def _build_flow_expr_str(flow_dict, cmd, strict):
if key == 'proto':
flow_expr_arr.append(value)
else:
flow_expr_arr.append("{}={}".format(key, str(value)))
flow_expr_arr.append(f"{key}={str(value)}")
if actions:
flow_expr_arr.append(actions)

View File

@ -40,7 +40,7 @@ class DeferredCall:
return '{}({})'.format(
self.func.__name__,
', '.join([repr(x) for x in self.args] +
['{}={}'.format(k, repr(v))
[f'{k}={repr(v)}'
for k, v in self.kwargs.items()]))
def execute(self):

View File

@ -153,7 +153,7 @@ class DHCPIPv4Responder(dhcp_base.DHCPResponderBase):
if is_ack:
fqdn = 'host-%s' % ip_addr.replace('.', '-').replace(':', '-')
if cfg.CONF.dns_domain:
fqdn = '{}.{}'.format(fqdn, cfg.CONF.dns_domain)
fqdn = f'{fqdn}.{cfg.CONF.dns_domain}'
domain_name_bin = struct.pack('!%ds' % len(fqdn),
bytes(str(fqdn).encode()))
options.option_list.append(

View File

@ -221,7 +221,7 @@ class DHCPIPv6Responder(dhcp_base.DHCPResponderBase):
# 39: Fully Qualified Domain Name
fqdn = 'host-%s' % ip_addr.replace('.', '-').replace(':', '-')
if req_type == 'REQUEST' and cfg.CONF.dns_domain:
fqdn = '{}.{}'.format(fqdn, cfg.CONF.dns_domain)
fqdn = f'{fqdn}.{cfg.CONF.dns_domain}'
# 0000 0... = Reserved: 0x00
# .... .0.. = N bit: Server should perform DNS updates

View File

@ -138,7 +138,7 @@ class HostMedataHAProxyDaemonMonitor:
buf.write('%s' % _HOST_PATH_PROXY_TEMPLATE.render(
log_level='debug',
log_tag="{}-{}".format(PROXY_SERVICE_NAME, self._host_id),
log_tag=f"{PROXY_SERVICE_NAME}-{self._host_id}",
user=username,
group=groupname,
maxconn=1024,

View File

@ -236,7 +236,7 @@ class MetadataPathAgentExtension(
"for dev %s, error: %s") % (self.META_DEV_NAME, e)
raise RuntimeError(msg)
cidr = "%s/%s" % (
cidr = "{}/{}".format(
self.provider_gateway_ip,
netaddr.IPNetwork(self.provider_cidr).prefixlen)
ns_dev.addr.add(cidr)

View File

@ -146,7 +146,7 @@ class ConntrackHelperAgentExtension(l3_extension.L3AgentExtension):
def _get_chain_rules_list(self, conntrack_helper, wrap_name):
chain_name = self._get_chain_name(conntrack_helper.id)
chain_rule_list = [(DEFAULT_CONNTRACK_HELPER_CHAIN,
'-j {}-{}'.format(wrap_name, chain_name))]
f'-j {wrap_name}-{chain_name}')]
chain_rule_list.append((chain_name,
'-p %(proto)s --dport %(dport)s -j CT '
'--helper %(helper)s' %

View File

@ -146,7 +146,7 @@ class AgentMixin:
LOG.debug("Enqueueing router's %s state change to %s",
router_id, state)
state_change = threading.Thread(target=self._enqueue_state_change,
args=(router_id, state))
args=(router_id, state))
state_change.start()
# TODO(ralonsoh): remove once the eventlet deprecation is finished.
time.sleep(0)

View File

@ -120,8 +120,8 @@ class ItemAllocator:
self._write_allocations()
def _write_allocations(self):
current = ["{},{}\n".format(k, v) for k, v in self.allocations.items()]
remembered = ["{},{}\n".format(k, v)
current = [f"{k},{v}\n" for k, v in self.allocations.items()]
remembered = [f"{k},{v}\n"
for k, v in self.remembered.items()]
current.extend(remembered)
self._write(current)

View File

@ -26,8 +26,8 @@ class LinkLocalAddressPair(netaddr.IPNetwork):
# TODO(kevinbenton): the callers of this seem only interested in an IP,
# so we should just return two IPAddresses.
return (
netaddr.IPNetwork("{}/{}".format(self.network, self.prefixlen)),
netaddr.IPNetwork("{}/{}".format(self[-1], self.prefixlen)))
netaddr.IPNetwork(f"{self.network}/{self.prefixlen}"),
netaddr.IPNetwork(f"{self[-1]}/{self.prefixlen}"))
class LinkLocalAllocator(ItemAllocator):

View File

@ -293,7 +293,7 @@ class RouterInfo(BaseRouterInfo):
mark_id = self._address_scope_to_mark_id[address_scope]
# NOTE: Address scopes use only the upper 16 bits of the 32 fwmark
return "{}/{}".format(hex(mark_id << 16), ADDRESS_SCOPE_MARK_MASK)
return f"{hex(mark_id << 16)}/{ADDRESS_SCOPE_MARK_MASK}"
def get_port_address_scope_mark(self, port):
"""Get the IP version 4 and 6 address scope mark for the port
@ -642,7 +642,7 @@ class RouterInfo(BaseRouterInfo):
namespace=self.ns_name)
def address_scope_mangle_rule(self, device_name, mark_mask):
return '-i {} -j MARK --set-xmark {}'.format(device_name, mark_mask)
return f'-i {device_name} -j MARK --set-xmark {mark_mask}'
def address_scope_filter_rule(self, device_name, mark_mask):
return '-o {} -m mark ! --mark {} -j DROP'.format(

View File

@ -250,7 +250,7 @@ class Daemon:
self.run()
def _set_process_title(self):
proctitle = "{} ({})".format(self.procname, self._parent_proctitle)
proctitle = f"{self.procname} ({self._parent_proctitle})"
setproctitle.setproctitle(proctitle)
def run(self):

View File

@ -124,7 +124,7 @@ class DictModel(collections.abc.MutableMapping):
del self._dictmodel_internal_storage[name]
def __str__(self):
pairs = ['{}={}'.format(k, v) for k, v in
pairs = [f'{k}={v}' for k, v in
self._dictmodel_internal_storage.items()]
return ', '.join(sorted(pairs))
@ -164,7 +164,7 @@ class NetModel(DictModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._ns_name = "{}{}".format(NS_PREFIX, self.id)
self._ns_name = f"{NS_PREFIX}{self.id}"
@property
def namespace(self):
@ -770,7 +770,7 @@ class Dnsmasq(DhcpLocalProcess):
ip_addresses[0].replace('.', '-').replace(':', '-'))
fqdn = hostname
if self.conf.dns_domain:
fqdn = '{}.{}'.format(fqdn, self.conf.dns_domain)
fqdn = f'{fqdn}.{self.conf.dns_domain}'
return hostname, fqdn
@ -1847,7 +1847,7 @@ class DeviceManager:
for fixed_ip in port.fixed_ips:
subnet = fixed_ip.subnet
net = netaddr.IPNetwork(subnet.cidr)
ip_cidr = '{}/{}'.format(fixed_ip.ip_address, net.prefixlen)
ip_cidr = f'{fixed_ip.ip_address}/{net.prefixlen}'
ip_cidrs.append(ip_cidr)
need_ipv6_metadata = False
@ -1863,7 +1863,7 @@ class DeviceManager:
gateway = subnet.gateway_ip
if gateway:
net = netaddr.IPNetwork(subnet.cidr)
ip_cidrs.append('{}/{}'.format(gateway, net.prefixlen))
ip_cidrs.append(f'{gateway}/{net.prefixlen}')
if self.conf.force_metadata or self.conf.enable_isolated_metadata:
ip_cidrs.append(constants.METADATA_CIDR)

View File

@ -83,7 +83,7 @@ class ProcessManager(MonitoredProcess):
self.service_pid_fname = 'pid'
self.service = DEFAULT_SERVICE_NAME
process_tag = '{}-{}'.format(self.service, self.uuid)
process_tag = f'{self.service}-{self.uuid}'
self.cmd_addl_env = cmd_addl_env or {}
self.cmd_addl_env[PROCESS_TAG] = process_tag

View File

@ -123,7 +123,7 @@ class IpsetManager:
process_input = ["create {} hash:net family {}".format(new_set_name,
set_type)]
for ip in member_ips:
process_input.append("add {} {}".format(new_set_name, ip))
process_input.append(f"add {new_set_name} {ip}")
self._restore_sets(process_input)
self._swap_sets(new_set_name, set_name)

View File

@ -852,7 +852,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
args += ['--%s' % direction, '%s' % port_range_min]
else:
args += ['-m', 'multiport', '--%ss' % direction,
'{}:{}'.format(port_range_min, port_range_max)]
f'{port_range_min}:{port_range_max}']
return args
def _ip_prefix_arg(self, direction, ip_prefix):

View File

@ -77,12 +77,12 @@ def comment_rule(rule, comment):
comment = '-m comment --comment "%s"' % comment
if rule.startswith('-j'):
# this is a jump only rule so we just put the comment first
return '{} {}'.format(comment, rule)
return f'{comment} {rule}'
try:
jpos = rule.index(' -j ')
return ' '.join((rule[:jpos], comment, rule[jpos + 1:]))
except ValueError:
return '{} {}'.format(rule, comment)
return f'{rule} {comment}'
def get_chain_name(chain_name, wrap=True):
@ -120,10 +120,10 @@ class IptablesRule:
def __str__(self):
if self.wrap:
chain = '{}-{}'.format(self.wrap_name, self.chain)
chain = f'{self.wrap_name}-{self.chain}'
else:
chain = self.chain
rule = '-A {} {}'.format(chain, self.rule)
rule = f'-A {chain} {self.rule}'
# If self.rule is '' the above will cause a trailing space, which
# could cause us to not match on save/restore, so strip it now.
return comment_rule(rule.strip(), self.comment)
@ -193,7 +193,7 @@ class IptablesTable:
self.remove_rules += [str(r) for r in self.rules
if r.chain == name or jump_snippet in r.rule]
else:
jump_snippet = '-j {}-{}'.format(self.wrap_name, name)
jump_snippet = f'-j {self.wrap_name}-{name}'
# Remove rules from list that have a matching chain name or
# a matching jump chain
@ -225,7 +225,7 @@ class IptablesTable:
def _wrap_target_chain(self, s, wrap):
if s.startswith('$'):
s = ('{}-{}'.format(self.wrap_name, get_chain_name(s[1:], wrap)))
s = (f'{self.wrap_name}-{get_chain_name(s[1:], wrap)}')
return s
@ -578,7 +578,7 @@ class IptablesManager:
s += [('ip6tables', self.ipv6)]
all_commands = [] # variable to keep track all commands for return val
for cmd, tables in s:
args = ['{}-save'.format(cmd)]
args = [f'{cmd}-save']
if self.namespace:
args = ['ip', 'netns', 'exec', self.namespace] + args
try:
@ -622,7 +622,7 @@ class IptablesManager:
# always end with a new line
commands.append('')
args = ['{}-restore'.format(cmd), '-n']
args = [f'{cmd}-restore', '-n']
if self.namespace:
args = ['ip', 'netns', 'exec', self.namespace] + args
@ -682,7 +682,7 @@ class IptablesManager:
line.strip() not in rules]
# generate our list of chain names
our_chains = [':{}-{}'.format(self.wrap_name, name) for name in chains]
our_chains = [f':{self.wrap_name}-{name}' for name in chains]
# the unwrapped chains (e.g. neutron-filter-top) may already exist in
# the new_filter since they aren't marked by the wrap_name so we only

View File

@ -65,7 +65,7 @@ def get_free_range(parent_range, excluded_ranges, size=PRIMARY_VIP_RANGE_SIZE):
free_cidrs = netaddr.IPSet([parent_range]) - netaddr.IPSet(excluded_ranges)
for cidr in free_cidrs.iter_cidrs():
if cidr.prefixlen <= size:
return '{}/{}'.format(cidr.network, size)
return f'{cidr.network}/{size}'
raise ValueError(_('Network of size %(size)s, from IP range '
'%(parent_range)s excluding IP ranges '
@ -115,7 +115,7 @@ class KeepalivedVipAddress:
self.track)
def build_config(self):
result = '{} dev {}'.format(self.ip_address, self.interface_name)
result = f'{self.ip_address} dev {self.interface_name}'
if self.scope:
result += ' scope %s' % self.scope
if not self.track and _is_keepalived_use_no_track_supported():
@ -535,7 +535,7 @@ class KeepalivedTrackScript(KeepalivedConf):
def build_config_preamble(self):
config = ['',
'vrrp_script {}_{} {{'.format(HEALTH_CHECK_NAME, self.vr_id),
f'vrrp_script {HEALTH_CHECK_NAME}_{self.vr_id} {{',
' script "%s"' % self._get_script_location(),
' interval %s' % self.interval,
' fall 2',
@ -557,7 +557,7 @@ class KeepalivedTrackScript(KeepalivedConf):
return ''
config = [' track_script {',
' {}_{}'.format(HEALTH_CHECK_NAME, self.vr_id),
f' {HEALTH_CHECK_NAME}_{self.vr_id}',
' }']
return config
@ -575,7 +575,7 @@ class KeepalivedTrackScript(KeepalivedConf):
6: 'ping6',
}.get(netaddr.IPAddress(ip_addr).version)
return '{} -c 1 -w 1 {} 1>/dev/null || exit 1'.format(cmd, ip_addr)
return f'{cmd} -c 1 -w 1 {ip_addr} 1>/dev/null || exit 1'
def _check_ip_assigned(self):
cmd = 'ip a | grep %s || exit 0'

View File

@ -103,7 +103,7 @@ class FloatingIPTcCommandBase(ip_lib.IPDevice):
return filterids
def _add_filter(self, qdisc_id, direction, ip, rate, burst):
rate_value = "{}{}".format(rate, tc_lib.BW_LIMIT_UNIT)
rate_value = f"{rate}{tc_lib.BW_LIMIT_UNIT}"
burst_value = "{}{}".format(
tc_lib.TcCommand.get_ingress_qdisc_burst_value(rate, burst),
tc_lib.BURST_UNIT

View File

@ -22,7 +22,6 @@ import shlex
import socket
import threading
import time
import typing
import eventlet
from eventlet.green import subprocess
@ -185,7 +184,7 @@ def find_child_pids(pid, recursive=False):
def pgrep(
command: str,
entire_command_line: bool = True
) -> typing.Optional[str]:
) -> str | None:
cmd = ['pgrep']
if entire_command_line:
cmd += ['-f']
@ -254,7 +253,7 @@ def _get_conf_base(cfg_root, uuid, ensure_conf_dir):
def get_conf_file_name(cfg_root, uuid, cfg_file, ensure_conf_dir=False):
"""Returns the file name for a given kind of config file."""
conf_base = _get_conf_base(cfg_root, uuid, ensure_conf_dir)
return "{}.{}".format(conf_base, cfg_file)
return f"{conf_base}.{cfg_file}"
def get_value_from_file(filename, converter=None):

View File

@ -63,7 +63,7 @@ def create_consumers(endpoints, prefix, topic_details, start_listening=True):
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '{}.{}'.format(topic_name, node_name)
node_topic_name = f'{topic_name}.{node_name}'
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)

View File

@ -157,7 +157,7 @@ class ExtensionMiddleware(base.ConfigurableMiddleware):
resource.collection)
for action, method in resource.collection_actions.items():
conditions = dict(method=[method])
path = "/{}/{}".format(resource.collection, action)
path = f"/{resource.collection}/{action}"
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
@ -582,7 +582,7 @@ class RequestExtension:
self.url_route = url_route
self.handler = handler
self.conditions = dict(method=[method])
self.key = "{}-{}".format(method, url_route)
self.key = f"{method}-{url_route}"
class ActionExtension:

View File

@ -129,8 +129,8 @@ class Controller:
self._parent_id_name = None
parent_part = ''
self._plugin_handlers = {
self.LIST: 'get{}_{}'.format(parent_part, self._collection),
self.SHOW: 'get{}_{}'.format(parent_part, self._resource)
self.LIST: f'get{parent_part}_{self._collection}',
self.SHOW: f'get{parent_part}_{self._resource}'
}
for action in [self.CREATE, self.UPDATE, self.DELETE]:
self._plugin_handlers[action] = '{}{}_{}'.format(

View File

@ -109,12 +109,12 @@ class _MovedGlobals:
new_module, new_name = self._mg__default_new_mod, name
if new_module and new_name in vars(new_module):
old_location = '{}.{}'.format(old_module.__name__, name)
new_location = '{}.{}'.format(new_module.__name__, new_name)
old_location = f'{old_module.__name__}.{name}'
new_location = f'{new_module.__name__}.{new_name}'
changed = 'renamed' if old_module == new_module else 'moved'
debtcollector.deprecate(
old_location,
message='{} to {}'.format(changed, new_location),
message=f'{changed} to {new_location}',
stacklevel=4)
return vars(new_module)[new_name]

View File

@ -52,7 +52,7 @@ def valid_ipv6_url(host, port):
square brackets always required in ipv6 URI.
"""
if netutils.is_valid_ipv6(host):
uri = '[{}]:{}'.format(host, port)
uri = f'[{host}]:{port}'
else:
uri = '{}:{}'.format(host, port)
uri = f'{host}:{port}'
return uri

View File

@ -64,7 +64,7 @@ def acl_direction(r, port=None, port_group=None):
if port:
return '{} == "{}"'.format(portdir, port['id'])
return '{} == @{}'.format(portdir, port_group)
return f'{portdir} == @{port_group}'
def acl_ethertype(r):
@ -148,7 +148,7 @@ def add_acls_for_drop_port_group(pg_name):
"name": [],
"severity": [],
"direction": direction,
"match": '{} == @{} && ip'.format(p, pg_name)}
"match": f'{p} == @{pg_name} && ip'}
acl_list.append(acl)
return acl_list
@ -226,7 +226,7 @@ def acl_remote_group_id(r, ip_version):
src_or_dst = 'src' if r['direction'] == const.INGRESS_DIRECTION else 'dst'
addrset_name = utils.ovn_pg_addrset_name(r['remote_group_id'],
ip_version)
return ' && {}.{} == ${}'.format(ip_version, src_or_dst, addrset_name)
return f' && {ip_version}.{src_or_dst} == ${addrset_name}'
def acl_remote_address_group_id(r, ip_version):
@ -236,7 +236,7 @@ def acl_remote_address_group_id(r, ip_version):
src_or_dst = 'src' if r['direction'] == const.INGRESS_DIRECTION else 'dst'
addrset_name = utils.ovn_ag_addrset_name(r['remote_address_group_id'],
ip_version)
return ' && %s.%s == $%s' % (ip_version, src_or_dst, addrset_name)
return ' && {}.{} == ${}'.format(ip_version, src_or_dst, addrset_name)
def _add_sg_rule_acl_for_port_group(port_group, stateful, r):

View File

@ -202,7 +202,7 @@ def ovn_name(id):
# is a UUID. If so then there will be no matches.
# We prefix the UUID to enable us to use the Neutron UUID when
# updating, deleting etc.
return "{}{}".format(constants.OVN_NAME_PREFIX, id)
return f"{constants.OVN_NAME_PREFIX}{id}"
def ovn_lrouter_port_name(id):
@ -249,7 +249,7 @@ def ovn_addrset_name(sg_id, ip_version):
# as-<ip version>-<security group uuid>
# with all '-' replaced with '_'. This replacement is necessary
# because OVN doesn't support '-' in an address set name.
return ('as-{}-{}'.format(ip_version, sg_id)).replace('-', '_')
return (f'as-{ip_version}-{sg_id}').replace('-', '_')
def ovn_pg_addrset_name(sg_id, ip_version):
@ -258,7 +258,7 @@ def ovn_pg_addrset_name(sg_id, ip_version):
# pg-<security group uuid>-<ip version>
# with all '-' replaced with '_'. This replacement is necessary
# because OVN doesn't support '-' in an address set name.
return ('pg-{}-{}'.format(sg_id, ip_version)).replace('-', '_')
return (f'pg-{sg_id}-{ip_version}').replace('-', '_')
def ovn_ag_addrset_name(ag_id, ip_version):
@ -267,7 +267,7 @@ def ovn_ag_addrset_name(ag_id, ip_version):
# ag-<address group uuid>-<ip version>
# with all '-' replaced with '_'. This replacement is necessary
# because OVN doesn't support '-' in an address set name.
return ('ag-%s-%s' % (ag_id, ip_version)).replace('-', '_')
return ('ag-{}-{}'.format(ag_id, ip_version)).replace('-', '_')
def ovn_port_group_name(sg_id):

View File

@ -143,8 +143,8 @@ def get_dhcp_agent_device_id(network_id, host, segmentation_id=None):
local_hostname = host.split('.')[0]
host_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, str(local_hostname))
if not segmentation_id:
return 'dhcp{}-{}'.format(host_uuid, network_id)
return 'dhcp{}-{}-{}'.format(host_uuid, network_id, segmentation_id)
return f'dhcp{host_uuid}-{network_id}'
return f'dhcp{host_uuid}-{network_id}-{segmentation_id}'
def is_dns_servers_any_address(dns_servers, ip_version):
@ -376,7 +376,7 @@ def _hex_format(port, mask=0):
def hex_str(num):
return format(num, '#06x')
if mask > 0:
return "{}/{}".format(hex_str(port), hex_str(0xffff & ~mask))
return f"{hex_str(port)}/{hex_str(0xffff & ~mask)}"
return hex_str(port)
@ -840,7 +840,7 @@ def bytes_to_bits(value):
def bits_to_kilobits(
value: typing.Union[int, float],
value: int | float,
base: int
) -> int:
# NOTE(slaweq): round up that even 1 bit will give 1 kbit as a result, but

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing
from oslo_utils import timeutils
from neutron.common import utils
@ -48,7 +46,7 @@ def get_start_time(default=None, current_time=False):
return default
def get_api_worker_id() -> typing.Union[int, None]:
def get_api_worker_id() -> int | None:
"""Return the worker ID number provided by uWSGI"""
try:
# pylint: disable=import-outside-toplevel

View File

@ -11,7 +11,6 @@
# under the License.
from importlib.metadata import entry_points
import sys
from oslo_config import cfg
@ -20,16 +19,10 @@ from neutron._i18n import _
MIGRATION_ENTRYPOINTS = 'neutron.db.alembic_migrations'
if sys.version_info >= (3, 10):
migration_entrypoints = {
entrypoint.name: entrypoint
for entrypoint in entry_points(group=MIGRATION_ENTRYPOINTS)
}
else:
migration_entrypoints = {
entrypoint.name: entrypoint
for entrypoint in entry_points()[MIGRATION_ENTRYPOINTS]
}
migration_entrypoints = {
entrypoint.name: entrypoint
for entrypoint in entry_points(group=MIGRATION_ENTRYPOINTS)
}
INSTALLED_SUBPROJECTS = list(migration_entrypoints)

View File

@ -70,7 +70,7 @@ class AddressGroupDbMixin(ag_ext.AddressGroupPluginBase):
normalized_addrs = set()
for addr in req_addrs:
addr = netaddr.IPNetwork(addr)
normalized_addr = "{}/{}".format(addr.network, addr.prefixlen)
normalized_addr = f"{addr.network}/{addr.prefixlen}"
normalized_addrs.add(normalized_addr)
addrs_in_ag = []
addrs_not_in_ag = []

View File

@ -35,6 +35,6 @@ COLUMN = 'project_id'
def upgrade():
op.create_unique_constraint(
constraint_name='uniq_{}0{}'.format(TABLE, COLUMN),
constraint_name=f'uniq_{TABLE}0{COLUMN}',
table_name=TABLE,
columns=[COLUMN])

View File

@ -251,7 +251,7 @@ def _get_release_labels(labels):
for label in labels:
# release labels were introduced Liberty for a short time and dropped
# in that same release cycle
result.add('{}_{}'.format(migration.LIBERTY, label))
result.add(f'{migration.LIBERTY}_{label}')
return result

View File

@ -87,7 +87,7 @@ class IPAllocationPool(model_base.BASEV2, model_base.HasId):
last_ip = sa.Column(sa.String(64), nullable=False)
def __repr__(self):
return "{} - {}".format(self.first_ip, self.last_ip)
return f"{self.first_ip} - {self.last_ip}"
class IPAllocation(model_base.BASEV2):

View File

@ -33,7 +33,7 @@ def get_node_uuid(
group_name: str,
host: str,
worker_id: int) -> str:
node_str = '%s%s%s' % (group_name, host, str(worker_id))
node_str = '{}{}{}'.format(group_name, host, str(worker_id))
return uuid.uuid5(OVN_HASHRING_UUID_NAMESPACE, node_str).hex

View File

@ -343,7 +343,7 @@ class SecurityGroupInfoAPIMixin:
# only allow DHCP servers to talk to the appropriate IP address
# to avoid getting leases that don't match the Neutron IPs
prefix = '32' if ip_version == 4 else '128'
dests = ['{}/{}'.format(ip, prefix) for ip in port['fixed_ips']
dests = [f'{ip}/{prefix}' for ip in port['fixed_ips']
if netaddr.IPNetwork(ip).version == ip_version]
if ip_version == 4:
# v4 dhcp servers can also talk to broadcast

View File

@ -14,7 +14,6 @@
# under the License.
import abc
import typing
import netaddr
from neutron_lib.api import converters
@ -149,7 +148,7 @@ class SecurityGroupRuleInvalidEtherType(exceptions.InvalidInput):
"supported. Allowed values are %(values)s.")
def convert_protocol(value) -> typing.Optional[str]:
def convert_protocol(value) -> str | None:
if value in _constants.SG_RULE_PROTO_ANY:
return None
try:

View File

@ -197,7 +197,7 @@ class TaggingController:
# GET /v2.0/{obj_resource}/{obj_resource_id}/tags
ctx = request.context
rinfo = self._get_resource_info(ctx, kwargs)
policy.enforce(ctx, 'get_{}_{}'.format(rinfo.obj_type, TAGS),
policy.enforce(ctx, f'get_{rinfo.obj_type}_{TAGS}',
rinfo.obj)
return self.plugin.get_tags(ctx, rinfo.obj_type, rinfo.obj['id'])
@ -207,7 +207,7 @@ class TaggingController:
validate_tag(id)
ctx = request.context
rinfo = self._get_resource_info(ctx, kwargs)
policy.enforce(ctx, 'get_{}:{}'.format(rinfo.obj_type, TAGS),
policy.enforce(ctx, f'get_{rinfo.obj_type}:{TAGS}',
rinfo.obj)
return self.plugin.get_tag(ctx, rinfo.obj_type, rinfo.obj['id'], id)
@ -217,7 +217,7 @@ class TaggingController:
validate_tags(body)
ctx = request.context
rinfo = self._get_resource_info(ctx, kwargs, tags=body[TAGS])
policy.enforce(ctx, 'create_{}:{}'.format(rinfo.obj_type, TAGS),
policy.enforce(ctx, f'create_{rinfo.obj_type}:{TAGS}',
rinfo.obj)
validate_tags_limit(rinfo.obj_type, body['tags'])
notify_tag_action(ctx, 'create.start', rinfo.obj_type,
@ -234,7 +234,7 @@ class TaggingController:
validate_tag(id)
ctx = request.context
rinfo = self._get_resource_info(ctx, kwargs, tags=[id])
policy.enforce(ctx, 'update_{}:{}'.format(rinfo.obj_type, TAGS),
policy.enforce(ctx, f'update_{rinfo.obj_type}:{TAGS}',
rinfo.obj)
current_tags = self.plugin.get_tags(
ctx, rinfo.obj_type, rinfo.obj['id'])['tags']

View File

@ -40,7 +40,7 @@ class IpamAllocationPool(model_base.BASEV2, model_base.HasId):
last_ip = sa.Column(sa.String(64), nullable=False)
def __repr__(self):
return "{} - {}".format(self.first_ip, self.last_ip)
return f"{self.first_ip} - {self.last_ip}"
class IpamSubnet(model_base.BASEV2, model_base.HasId):

View File

@ -343,7 +343,8 @@ class SubnetRequestFactory:
prefixlen = subnet['prefixlen']
if not validators.is_attr_set(prefixlen):
prefixlen = int(subnetpool['default_prefixlen'])
gw_ip_net = netaddr.IPNetwork('%s/%s' % (gateway_ip, prefixlen))
gw_ip_net = netaddr.IPNetwork(
'{}/{}'.format(gateway_ip, prefixlen))
cidr = gw_ip_net.cidr
# TODO(ralonsoh): "tenant_id" reference should be removed.

View File

@ -18,7 +18,6 @@ import functools
import itertools
import sys
import traceback
import typing
from neutron_lib.db import api as db_api
from neutron_lib.db import model_base
@ -438,10 +437,10 @@ class DeclarativeObject(abc.ABCMeta):
class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
# should be set for all persistent objects
db_model: typing.Optional[model_base.BASEV2] = None
db_model: model_base.BASEV2 | None = None
# should be set for all rbac aware objects
rbac_db_cls: typing.Optional[model_base.BASEV2] = None
rbac_db_cls: model_base.BASEV2 | None = None
primary_keys = ['id']

View File

@ -83,10 +83,10 @@ def _safe_get_object(obj_cls, context, **kwargs):
db_obj = get_object(obj_cls, context, **kwargs)
if db_obj is None:
key = ", ".join(['{}={}'.format(key, value) for (key, value)
key = ", ".join([f'{key}={value}' for (key, value)
in kwargs.items()])
raise n_exc.ObjectNotFound(
id="{}({})".format(obj_cls.db_model.__name__, key))
id=f"{obj_cls.db_model.__name__}({key})")
return db_obj

View File

@ -165,8 +165,8 @@ class NeutronPecanController:
self._parent_id_name = ('%s_id' % self.parent
if self.parent else None)
self._plugin_handlers = {
self.LIST: 'get{}_{}'.format(parent_resource, self.collection),
self.SHOW: 'get{}_{}'.format(parent_resource, self.resource)
self.LIST: f'get{parent_resource}_{self.collection}',
self.SHOW: f'get{parent_resource}_{self.resource}'
}
for action in [self.CREATE, self.UPDATE, self.DELETE]:
self._plugin_handlers[action] = '{}{}_{}'.format(

View File

@ -43,7 +43,7 @@ class NotifierHook(hooks.PecanHook):
if utils.is_member_action(utils.get_controller(state)):
return
action = pecan_constants.ACTION_MAP.get(state.request.method)
event = '{}.{}.start'.format(resource, action)
event = f'{resource}.{action}.start'
if action in ('create', 'update'):
# notifier just gets plain old body without any treatment other
# than the population of the object ID being operated on
@ -96,7 +96,7 @@ class NotifierHook(hooks.PecanHook):
else:
result = state.response.json
notifier_method = '{}.{}.end'.format(resource_name, action)
notifier_method = f'{resource_name}.{action}.end'
notifier_action = utils.get_controller(state).plugin_handlers[action]
registry.publish(resource_name, events.BEFORE_RESPONSE, self,
payload=events.APIEventPayload(

View File

@ -233,7 +233,7 @@ class PolicyHook(hooks.PecanHook):
context,
# NOTE(kevinbenton): this used to reference a
# _plugin_handlers dict, why?
'get_{}:{}'.format(resource, attr_name),
f'get_{resource}:{attr_name}',
data,
might_not_exist=True,
pluralized=collection):

View File

@ -157,7 +157,7 @@ class SegmentTypeDriver(BaseTypeDriver):
LOG.debug(' - Non allocated segments:')
for non_allocated_segment in (
self.segmentation_obj.get_all_unallocated_segments(context,
**filters)):
**filters)):
LOG.debug(' - %s', non_allocated_segment)
if directory.get_plugin(plugin_constants.NETWORK_SEGMENT_RANGE):
LOG.debug(' - Network segment ranges:')

View File

@ -27,4 +27,4 @@ def get_vlan_device_name(src_dev, vlan):
# Ensure that independent of the vlan len the same name prefix is used.
src_dev = plugin_utils.get_interface_name(
src_dev, max_len=n_const.DEVICE_NAME_MAX_LEN - MAX_VLAN_POSTFIX_LEN)
return "{}.{}".format(src_dev, vlan)
return f"{src_dev}.{vlan}"

View File

@ -495,7 +495,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
segmentation_id = local_vlan_map.get('segmentation_id')
if net_uuid:
# TODO(sahid): This key thing should be normalized.
key = "{}/{}".format(net_uuid, segmentation_id)
key = f"{net_uuid}/{segmentation_id}"
if (key not in self._local_vlan_hints and
local_vlan != ovs_const.DEAD_VLAN_TAG):
self.available_local_vlans.remove(local_vlan)
@ -1027,7 +1027,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
except vlanmanager.MappingNotFound:
# TODO(sahid): This local_vlan_hints should have its own
# datastructure and model to be manipulated.
key = "{}/{}".format(net_uuid, segmentation_id)
key = f"{net_uuid}/{segmentation_id}"
lvid = self._local_vlan_hints.pop(key, None)
if lvid is None:
if not self.available_local_vlans:
@ -2516,7 +2516,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
remote_tunnel_hash = cls.get_tunnel_hash(remote_ip, hashlen)
if not remote_tunnel_hash:
return None
return '{}-{}'.format(network_type, remote_tunnel_hash)
return f'{network_type}-{remote_tunnel_hash}'
def _agent_has_updates(self, polling_manager):
return (polling_manager.is_polling_required or

View File

@ -87,7 +87,7 @@ def _add_gateway_chassis(api, txn, lrp_name, val):
prio = len(val)
uuid_list = []
for chassis in val:
gwc_name = '{}_{}'.format(lrp_name, chassis)
gwc_name = f'{lrp_name}_{chassis}'
try:
gwc = idlutils.row_by_value(
api.idl, 'Gateway_Chassis', 'name', gwc_name)

View File

@ -122,7 +122,7 @@ class OVNClientQosExtension:
in_or_out = 'outport'
src_or_dst = 'dst'
match = '{} == "{}"'.format(in_or_out, port_id)
match = f'{in_or_out} == "{port_id}"'
if ip_address and resident_port:
match += (' && ip4.%s == %s && is_chassis_resident("%s")' %
(src_or_dst, ip_address, resident_port))

View File

@ -612,7 +612,7 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
if not physnet:
continue
lrp_name = '{}{}'.format(ovn_const.LRP_PREFIX, port)
lrp_name = f'{ovn_const.LRP_PREFIX}{port}'
original_state = self.get_gateway_chassis_binding(lrp_name)
az_hints = self.get_gateway_chassis_az_hints(lrp_name)
# Filter out chassis that lost physnet, the cms option,

View File

@ -210,7 +210,7 @@ class OVNClient:
subnet_opt = subnet_opts.get(opt)
if not subnet_opt:
return port_opt
return '{{{}, {}}}'.format(subnet_opt[1:-1], port_opt[1:-1])
return f'{{{subnet_opt[1:-1]}, {port_opt[1:-1]}}}'
def _get_port_dhcp_options(self, port, ip_version):
"""Return dhcp options for port.

View File

@ -309,13 +309,13 @@ class DNSExtensionDriver(api.ExtensionDriver):
hostname = dns_name
fqdn = dns_name
if not dns_name.endswith('.'):
fqdn = '{}.{}'.format(dns_name, dns_domain)
fqdn = f'{dns_name}.{dns_domain}'
else:
hostname = 'host-%s' % ip['ip_address'].replace(
'.', '-').replace(':', '-')
fqdn = hostname
if dns_domain:
fqdn = '{}.{}'.format(hostname, dns_domain)
fqdn = f'{hostname}.{dns_domain}'
dns_assignment.append({'ip_address': ip['ip_address'],
'hostname': hostname,
'fqdn': fqdn})

View File

@ -163,7 +163,7 @@ def _build_subattr_match_rule(attr_name, attr, action, target):
return
if key[0].startswith('type:list_of_dict'):
target_attributes = set([])
target_attributes = set()
for _attrs in target[attr_name]:
target_attributes = target_attributes.union(set(_attrs.keys()))
else:
@ -182,7 +182,7 @@ def _build_list_of_subattrs_rule(attr_name, attribute_value, action):
if isinstance(sub_attr, dict):
for k in sub_attr:
rules.append(policy.RuleCheck(
'rule', '{}:{}:{}'.format(action, attr_name, k)))
'rule', f'{action}:{attr_name}:{k}'))
if rules:
return policy.AndCheck(rules)
@ -227,7 +227,7 @@ def _build_match_rule(action, target, pluralized):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck(
'rule', '{}:{}'.format(action, attribute_name))
'rule', f'{action}:{attribute_name}')
# Build match entries for sub-attributes
if _should_validate_sub_attributes(
attribute, target[attribute_name]):
@ -277,7 +277,7 @@ class OwnerCheck(policy.Check):
match)
LOG.exception(err_reason)
raise exceptions.PolicyInitError(
policy="{}:{}".format(kind, match),
policy=f"{kind}:{match}",
reason=err_reason)
self._cache = cache._get_memory_cache_region(expiration_time=5)
super().__init__(kind, match)
@ -349,7 +349,7 @@ class OwnerCheck(policy.Check):
self.target_field)
LOG.error(err_reason)
raise exceptions.PolicyCheckError(
policy="{}:{}".format(self.kind, self.match),
policy=f"{self.kind}:{self.match}",
reason=err_reason)
parent_foreign_key = _RESOURCE_FOREIGN_KEYS.get(
"%ss" % parent_res, None)
@ -367,7 +367,7 @@ class OwnerCheck(policy.Check):
{'match': self.match, 'res': parent_res})
LOG.error(err_reason)
raise exceptions.PolicyCheckError(
policy="{}:{}".format(self.kind, self.match),
policy=f"{self.kind}:{self.match}",
reason=err_reason)
target_copy[self.target_field] = self._extract(

View File

@ -15,7 +15,6 @@
import os
from os import path
import re
import typing
from eventlet.green import subprocess
from neutron_lib.utils import helpers
@ -61,7 +60,7 @@ def read_file(_path: str) -> str:
@privileged.default.entrypoint
def write_to_tempfile(content: bytes,
_path: typing.Optional[str] = None,
_path: str | None = None,
suffix: str = '',
prefix: str = 'tmp'):
return fileutils.write_to_tempfile(content, path=_path, suffix=suffix,

View File

@ -27,8 +27,8 @@ def _connection_to_manager_uri(conn_uri):
ip, port = netutils.parse_host_port(addr)
if port is not None:
ip = netutils.escape_ipv6(ip)
return 'p{}:{}:{}'.format(proto, port, ip)
return 'p{}:{}'.format(proto, addr)
return f'p{proto}:{port}:{ip}'
return f'p{proto}:{addr}'
@privileged.ovs_vsctl_cmd.entrypoint

View File

@ -45,7 +45,7 @@ def profile(f):
if not cfg.CONF.enable_code_profiling:
return f(*args, **kwargs)
profid = "{}.{}".format(f.__module__, f.__name__)
profid = f"{f.__module__}.{f.__name__}"
profiler = cProfile.Profile()
start_time = datetime.now()
try:

View File

@ -76,7 +76,7 @@ class Designate(driver.ExternalDNSService):
if not CONF.designate.allow_reverse_dns_lookup:
return
# Set up the PTR records
recordset_name = '{}.{}'.format(dns_name, dns_domain)
recordset_name = f'{dns_name}.{dns_domain}'
ptr_zone_email = 'admin@%s' % dns_domain[:-1]
if CONF.designate.ptr_zone_email:
ptr_zone_email = CONF.designate.ptr_zone_email

View File

@ -249,7 +249,7 @@ class IptablesMeteringDriver(abstract_driver.MeteringAbstractDriver):
if rule['excluded']:
ipt_rule = '%s -j RETURN' % ipt_rule
else:
ipt_rule = '{} -j {}'.format(ipt_rule, label_chain)
ipt_rule = f'{ipt_rule} -j {label_chain}'
return ipt_rule
@staticmethod
@ -261,7 +261,7 @@ class IptablesMeteringDriver(abstract_driver.MeteringAbstractDriver):
source_ip_prefix = rule.get('source_ip_prefix')
if source_ip_prefix:
iptables_rule = "-s {} {}".format(source_ip_prefix, iptables_rule)
iptables_rule = f"-s {source_ip_prefix} {iptables_rule}"
destination_ip_prefix = rule.get('destination_ip_prefix')
if destination_ip_prefix:
@ -274,9 +274,9 @@ class IptablesMeteringDriver(abstract_driver.MeteringAbstractDriver):
def prepare_source_and_destination_rule_legacy(ext_dev, rule):
remote_ip = rule['remote_ip_prefix']
if rule['direction'] == 'egress':
ipt_rule = '-s {} -o {}'.format(remote_ip, ext_dev)
ipt_rule = f'-s {remote_ip} -o {ext_dev}'
else:
ipt_rule = '-d {} -i {}'.format(remote_ip, ext_dev)
ipt_rule = f'-d {remote_ip} -i {ext_dev}'
return ipt_rule
def _process_ns_specific_metering_label(self, router, ext_dev, im):

View File

@ -31,7 +31,7 @@ def get_br_int_port_name(prefix, port_id):
The port name is the one that plumbs into the integration bridge.
"""
return ("{}i-{}".format(prefix, port_id))[:constants.DEVICE_NAME_MAX_LEN]
return (f"{prefix}i-{port_id}")[:constants.DEVICE_NAME_MAX_LEN]
def get_br_trunk_port_name(prefix, port_id):
@ -39,7 +39,7 @@ def get_br_trunk_port_name(prefix, port_id):
The port name is the one that plumbs into the trunk bridge.
"""
return ("{}t-{}".format(prefix, port_id))[:constants.DEVICE_NAME_MAX_LEN]
return (f"{prefix}t-{port_id}")[:constants.DEVICE_NAME_MAX_LEN]
def get_patch_peer_attrs(peer_name, port_mac=None, port_id=None):

View File

@ -88,7 +88,7 @@ def wait_until_pkt_meter_rule_applied_ovs(bridge, port_vif, port_id,
def _pkt_rate_limit_rule_applied():
port_num = bridge.get_port_ofport(port_vif)
port_vlan = bridge.get_port_tag_by_name(port_vif)
key = "{}_{}_{}".format(type_, port_id, direction)
key = f"{type_}_{port_id}_{direction}"
meter_id = bridge.get_value_from_other_config(
port_vif, key, value_type=int)

View File

@ -81,7 +81,7 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
def _append_suffix(dev_name):
# If dev_name = 'xyz123' and the suffix is 'hostB' then the result
# will be 'xy_stB'
return '{}_{}'.format(dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
return f'{dev_name[:-4]}_{cfg.CONF.test_namespace_suffix[-3:]}'
def get_internal_device_name(ri, port_id):

View File

@ -38,7 +38,7 @@ def get_tunnel_name_full(cls, network_type, local_ip, remote_ip):
local_tunnel_hash = encodeutils.to_utf8(local_tunnel_hash)
source_ip_hash = hashlib.sha1(local_tunnel_hash).hexdigest()[:hashlen]
return '{}-{}-{}'.format(network_type, source_ip_hash, remote_ip_hash)
return f'{network_type}-{source_ip_hash}-{remote_ip_hash}'
ovs_neutron_agent.OVSNeutronAgent.get_tunnel_name = get_tunnel_name_full

View File

@ -22,7 +22,7 @@ from neutron.tests.common.exclusive_resources import resource_allocator
def _get_random_network(low, high, netmask):
ip = ip_address.get_random_ip(low, high)
return str(netaddr.IPNetwork("{}/{}".format(ip, netmask)).cidr)
return str(netaddr.IPNetwork(f"{ip}/{netmask}").cidr)
class ExclusiveIPNetwork(resource_allocator.ExclusiveResource):

View File

@ -261,7 +261,7 @@ def create_patch_ports(source, destination):
:param destination: Instance of OVSBridge
"""
common = common_utils.get_rand_name(max_length=4, prefix='')
prefix = '{}-{}-'.format(PATCH_PREFIX, common)
prefix = f'{PATCH_PREFIX}-{common}-'
source_name = common_utils.get_rand_device_name(prefix=prefix)
destination_name = common_utils.get_rand_device_name(prefix=prefix)

View File

@ -437,7 +437,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
collection = "%ss" % resource
for i in range(number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "{}_{}".format(name, i)
obj[resource]['name'] = f"{name}_{i}"
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
@ -516,7 +516,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
if ip_version == constants.IP_VERSION_6:
base_cidr = "fd%s::/64"
overrides = dict(zip(range(number),
[{'cidr': base_cidr % num} for num in range(number)]))
[{'cidr': base_cidr % num}
for num in range(number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
@ -665,7 +666,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _make_security_group(self, fmt, name=None, expected_res_status=None,
project_id=None, is_admin=False):
name = name or 'sg-{}'.format(uuidutils.generate_uuid())
name = name or f'sg-{uuidutils.generate_uuid()}'
project_id = project_id or self._tenant_id
data = {'security_group': {'name': name,
'description': name,
@ -7405,7 +7406,7 @@ class DbOperationBoundMixin:
# using filters shouldn't change the count either
if filters:
query_params = "&".join(
["{}={}".format(f, obj[f]) for f in filters])
[f"{f}={obj[f]}" for f in filters])
after_queries = self._list_and_record_queries(plural, query_params)
self.assertEqual(len(before_queries), len(after_queries),
self._qry_fail_msg(before_queries, after_queries))

View File

@ -33,7 +33,7 @@ OPTS = [
def _get_namespace_name(id_, suffix=None):
suffix = suffix or cfg.CONF.test_namespace_suffix
return "{}{}{}".format(linux_dhcp.NS_PREFIX, id_, suffix)
return f"{linux_dhcp.NS_PREFIX}{id_}{suffix}"
def NetModel_init(self, d):

View File

@ -93,7 +93,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
def get_name(self):
class_name, test_name = self.id().split(".")[-2:]
return "{}.{}".format(class_name, test_name)
return f"{class_name}.{test_name}"
def _wait_until_agent_up(self, agent_id):
def _agent_up():

View File

@ -223,7 +223,7 @@ class OVSConfigFixture(ConfigFixture):
self.phy_br_name = utils.get_rand_device_name(prefix='br-eth')
self.meta_br_name = self._generate_meta_bridge()
bridge_mappings = '{}:{}'.format(physnet, self.phy_br_name)
bridge_mappings = f'{physnet}:{self.phy_br_name}'
if env_desc.has_metadata:
bridge_mappings += ',{}:{}'.format('meta', self.meta_br_name)
self.config.update({

View File

@ -88,7 +88,7 @@ class EnvironmentDescription:
return self.network_type in ('vxlan', 'gre')
def __str__(self):
return '{}'.format(vars(self))
return f'{vars(self)}'
class HostDescription:
@ -113,7 +113,7 @@ class HostDescription:
self.segmented_physnet = segmented_physnet
def __str__(self):
return '{}'.format(vars(self))
return f'{vars(self)}'
class Host(fixtures.Fixture):

View File

@ -133,7 +133,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
else:
self._ip = fixed_ip['ip_address']
prefixlen = netaddr.IPNetwork(subnet['subnet']['cidr']).prefixlen
self._ip_cidr = '{}/{}'.format(self._ip, prefixlen)
self._ip_cidr = f'{self._ip}/{prefixlen}'
self.gateway_ip = subnet['subnet']['gateway_ip']
if self.use_dhcp:

View File

@ -67,7 +67,7 @@ class ProcessFixture(fixtures.Fixture):
fileutils.ensure_tree(log_dir, mode=0o755)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S-%f")
log_file = "{}--{}.log".format(self.process_name, timestamp)
log_file = f"{self.process_name}--{timestamp}.log"
run_as_root = bool(self.namespace)
exec_name = (self.exec_name
if run_as_root

View File

@ -178,7 +178,7 @@ class TestL3Agent(base.BaseFullStackTestCase):
suffix = agent.get_namespace_suffix()
else:
suffix = self.environment.hosts[0].l3_agent.get_namespace_suffix()
return "{}@{}".format(namespace, suffix)
return f"{namespace}@{suffix}"
def _get_l3_agents_with_ha_state(
self, router_id, ha_state=None):

View File

@ -60,7 +60,7 @@ class BaseLoggingTestCase(base.BaseFullStackTestCase):
flows = vm.bridge.dump_flows_for_table(table)
flows_list = flows.splitlines()
pattern = re.compile(
r"^.* table={}.* actions={}".format(table, actions))
fr"^.* table={table}.* actions={actions}")
for flow in flows_list:
if pattern.match(flow.strip()):
return True

View File

@ -184,7 +184,8 @@ class OvsMetadataExtensionTestCase(base.BaseFullStackTestCase):
flows_list = flows.splitlines()
LOG.info("Metadata bridge flows_list: %s", flows_list)
pattern = re.compile(
r"^.* table=%s,.* actions=%s" % (table, re.escape(actions)))
r"^.* table={},.* actions={}".format(table,
re.escape(actions)))
for flow in flows_list:
if pattern.match(flow.strip()):
return True

View File

@ -142,7 +142,7 @@ class L3AgentFipPortForwardingExtensionTestFramework(
conf_path = os.path.join(keepalived_pm.pids_path, keepalived_pm.uuid,
'keepalived.conf')
regex = "{} dev {}".format(fip_pf, interface_name)
regex = f"{fip_pf} dev {interface_name}"
pattern = re.compile(regex)
def check_harouter_fip_is_set():

View File

@ -616,8 +616,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
ns_name = "{}{}{}".format(
'qrouter-' + router_info['id'],
self.NESTED_NAMESPACE_SEPARATOR, agent.host)
ext_name = "qg-{}-{}".format(agent.host, _uuid()[-4:])
int_name = "qr-{}-{}".format(agent.host, _uuid()[-4:])
ext_name = f"qg-{agent.host}-{_uuid()[-4:]}"
int_name = f"qr-{agent.host}-{_uuid()[-4:]}"
get_ns_name = mock.patch.object(
namespaces.RouterNamespace, '_get_ns_name').start()

View File

@ -91,7 +91,7 @@ class ConntrackdManagerTestCase(base.BaseSudoTestCase):
try:
with open(config_path) as conf:
return conf.read()
except (OSError, IOError) as e:
except OSError as e:
if e.errno != errno.ENOENT:
raise
return ''

View File

@ -69,7 +69,7 @@ class OVSAgentTestBase(test_ovs_lib.OVSBridgeTestBase,
trace[l] = r
for k in required_keys:
if k not in trace:
self.fail("{} not found in trace {}".format(k, trace_lines))
self.fail(f"{k} not found in trace {trace_lines}")
return trace
@ -96,10 +96,10 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.addOnException(self.collect_flows_and_ports)
def collect_flows_and_ports(self, exc_info):
nicevif = lambda x: ['{}={}'.format(k, getattr(x, k))
nicevif = lambda x: [f'{k}={getattr(x, k)}'
for k in ['ofport', 'port_name', 'switch',
'vif_id', 'vif_mac']]
nicedev = lambda x: ['{}={}'.format(k, getattr(x, k))
nicedev = lambda x: [f'{k}={getattr(x, k)}'
for k in ['name', 'namespace']] + x.addr.list()
details = {'flows': self.br.dump_all_flows(),
'vifs': map(nicevif, self.br.get_vif_ports()),

View File

@ -434,7 +434,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
if enable_chassis_as_extport:
append_cms_options(other_config, 'enable-chassis-as-extport-host')
bridge_mapping = ",".join(["{}:br-provider{}".format(phys_net, i)
bridge_mapping = ",".join([f"{phys_net}:br-provider{i}"
for i, phys_net in enumerate(physical_nets)])
if name is None:
name = uuidutils.generate_uuid()

View File

@ -43,7 +43,7 @@ class TestDestroyPatchPorts(base.BaseSudoTestCase):
config.set_override('integration_bridge', self.int_br.br_name, "OVS")
config.set_override(
'bridge_mappings',
','.join(["{}:{}".format(net, br)
','.join([f"{net}:{br}"
for net, br in bridge_mappings.items()]),
"OVS")

View File

@ -154,7 +154,7 @@ class TestExtensionsController(TestRootController):
def test_get(self):
# Fetch any extension supported by plugins
test_alias = self._get_supported_extensions().pop()
response = self.app.get('{}/{}'.format(self.base_url, test_alias))
response = self.app.get(f'{self.base_url}/{test_alias}')
self.assertEqual(response.status_int, 200)
json_body = jsonutils.loads(response.body)
self.assertEqual(test_alias, json_body['extension']['alias'])
@ -296,7 +296,7 @@ class TestQuotasController(test_functional.PecanFunctionalTest):
def test_get_project_info(self):
for key in ('project', 'tenant'):
response = self.app.get('{}/{}.json'.format(self.base_url, key),
response = self.app.get(f'{self.base_url}/{key}.json',
headers={'X-Project-Id': 'admin',
'X-Roles': 'admin'})
self.assertEqual(200, response.status_int)

View File

@ -169,7 +169,7 @@ class TestSbApi(BaseOvnIdlTest):
chassis, switch = self._add_switch(
self.data['chassis'][0]['name'])
port, binding = self._add_port_to_switch(switch)
mac_ip = '{} {}'.format(mac, ipaddr)
mac_ip = f'{mac} {ipaddr}'
pb_update_event = events.WaitForUpdatePortBindingEvent(
port.name, mac=[mac_ip])
self.handler.watch_event(pb_update_event)

View File

@ -724,7 +724,7 @@ class TestAgentMonitor(base.TestOVNFunctionalBase):
n_utils.wait_until_true(
lambda:
isinstance(neutron_agent.AgentCache().get(self.chassis_name),
neutron_agent.ControllerAgent))
neutron_agent.ControllerAgent))
# Change back to gw chassis
self.sb_api.db_set(
@ -735,7 +735,7 @@ class TestAgentMonitor(base.TestOVNFunctionalBase):
n_utils.wait_until_true(
lambda:
isinstance(neutron_agent.AgentCache().get(self.chassis_name),
neutron_agent.ControllerGatewayAgent))
neutron_agent.ControllerGatewayAgent))
def test_agent_updated_at_use_nb_cfg_timestamp(self):
def check_agent_ts():

View File

@ -372,7 +372,7 @@ class AddIpRulesTestCase(BaseIpRuleTestCase):
rules = ip_lib.list_ip_rules(self.namespace, ip_version)
self._check_rules(
rules, ['table', 'from'], [str(table), ip_address],
'table {} and "from" IP address {}'.format(table, ip_address))
f'table {table} and "from" IP address {ip_address}')
priv_ip_lib.delete_ip_rule(self.namespace, table=table,
src=ip_address, src_len=ip_lenght,

View File

@ -524,7 +524,7 @@ class TestAZAwareWeightScheduler(test_dhcp_sch.TestDhcpSchedulerBaseTestCase,
# create dhcp agents
for i in range(self.az_count):
az = 'az%s' % i
hosts = ['{}-host-{}'.format(az, j)
hosts = [f'{az}-host-{j}'
for j in range(self.agent_count[i])]
dhcp_agents = self._create_and_set_agents_down(
hosts, down_agent_count=self.down_agent_count[i], az=az)

View File

@ -324,7 +324,7 @@ class L3AZSchedulerBaseTest(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
def _create_legacy_agents(self, agent_count, down_agent_count, az):
# Creates legacy l3 agents and sets admin state based on
# down agent count.
hosts = ['{}-host-{}'.format(az, i) for i in range(agent_count)]
hosts = [f'{az}-host-{i}' for i in range(agent_count)]
l3_agents = [
self._create_l3_agent(hosts[i], self.adminContext, 'legacy',
self.l3_plugin, (i >= down_agent_count),

View File

@ -100,7 +100,7 @@ class TestLoggingExtension(LoggingExtensionTestFramework):
def _is_log_flow_set(self, table, actions):
flows = self.log_driver.int_br.br.dump_flows_for_table(table)
pattern = re.compile(
r"^.* table={}.* actions={}".format(table, actions)
fr"^.* table={table}.* actions={actions}"
)
for flow in flows.splitlines():
if pattern.match(flow.strip()):

View File

@ -68,7 +68,7 @@ class StringSetMatcher:
def __repr__(self):
sep = '' if self.separator == ',' else " on %s" % self.separator
return '<comma-separated string for {}{}>'.format(self.set, sep)
return f'<comma-separated string for {self.set}{sep}>'
class OVS_Lib_Test_Common(base.BaseTestCase):
@ -158,7 +158,7 @@ class OVS_Lib_Test(base.BaseTestCase):
('cookie', 1754),
('priority', 3),
('tun_id', lsw_id),
('actions', "mod_vlan_vid:{},output:{}".format(vid, ofport))])
('actions', f"mod_vlan_vid:{vid},output:{ofport}")])
flow_dict_7 = collections.OrderedDict([
('cookie', 1256),
('priority', 4),
@ -385,10 +385,10 @@ class OVS_Lib_Test(base.BaseTestCase):
"%s,in_port=%d" % (cookie_spec, ofport))),
self._ofctl_mock("del-flows", self.BR_NAME, '-',
process_input=StringSetMatcher(
"{},tun_id={}".format(cookie_spec, lsw_id))),
f"{cookie_spec},tun_id={lsw_id}")),
self._ofctl_mock("del-flows", self.BR_NAME, '-',
process_input=StringSetMatcher(
"{},dl_vlan={}".format(cookie_spec, vid))),
f"{cookie_spec},dl_vlan={vid}")),
self._ofctl_mock("del-flows", self.BR_NAME, '-',
process_input="%s" % cookie_spec),
]

View File

@ -50,8 +50,8 @@ from neutron.tests import base
HOSTNAME = 'hostname'
dev_man = dhcp.DeviceManager
rpc_api = dhcp_agent.DhcpPluginApi
DEVICE_MANAGER = '{}.{}'.format(dev_man.__module__, dev_man.__name__)
DHCP_PLUGIN = '{}.{}'.format(rpc_api.__module__, rpc_api.__name__)
DEVICE_MANAGER = f'{dev_man.__module__}.{dev_man.__name__}'
DHCP_PLUGIN = f'{rpc_api.__module__}.{rpc_api.__name__}'
FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab'
FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID
FAKE_PROJECT_ID = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'

View File

@ -148,7 +148,7 @@ class MetadataPathAgentExtensionTestCase(base.BaseTestCase):
port_device_owner = "compute:test"
port_ip = "1.1.1.1"
with mock.patch.object(self.meta_ext.meta_daemon,
"config") as h_config, mock.patch.object(
"config") as h_config, mock.patch.object(
self.meta_ext.ext_api,
"get_provider_ip_info") as get_p_info:
get_p_info.return_value = {
@ -226,7 +226,7 @@ class MetadataPathAgentExtensionTestCase(base.BaseTestCase):
self.project_id = "p1"
with mock.patch.object(self.meta_ext.meta_daemon,
"config"), mock.patch.object(
"config"), mock.patch.object(
self.meta_ext.ext_api.cache_api,
"get_resource_by_id",
return_value=Port()) as get_res:
@ -263,7 +263,7 @@ class MetadataPathAgentExtensionTestCase(base.BaseTestCase):
port_device_owner = "compute:test"
port_ip = "1.1.1.1"
with mock.patch.object(self.meta_ext.meta_daemon,
"config") as h_config:
"config") as h_config:
port = {"port_id": port_id,
"fixed_ips": [{"ip_address": port_ip,
"subnet_id": "1"}],

View File

@ -79,10 +79,10 @@ class TestBasicRouterOperations(BasicRouterTestCaseFramework):
ri.external_gateway_removed(ex_gw_port, "qg-fake-name")
cidr_pri = '{}/{}'.format(gw_ip_pri, v4_prefixlen)
cidr_sec = '{}/{}'.format(gw_ip_sec, lib_constants.IPv4_BITS)
cidr_v6 = '{}/{}'.format(gw_ip6_pri, v6_prefixlen)
cidr_v6_sec = '{}/{}'.format(gw_ip6_sec, lib_constants.IPv6_BITS)
cidr_pri = f'{gw_ip_pri}/{v4_prefixlen}'
cidr_sec = f'{gw_ip_sec}/{lib_constants.IPv4_BITS}'
cidr_v6 = f'{gw_ip6_pri}/{v6_prefixlen}'
cidr_v6_sec = f'{gw_ip6_sec}/{lib_constants.IPv6_BITS}'
device.delete_addr_and_conntrack_state.assert_has_calls(
[mock.call(cidr_pri), mock.call(cidr_sec),

View File

@ -28,37 +28,37 @@ class ConntrackdConfigTestCase(BasicRouterOperationsFramework):
def get_expected(ha_confs_path):
return dedent(
"""
General {
General {{
HashSize 32768
HashLimit 131072
Syslog on
LockFile %(conf_path)s/%(uuid)s/conntrackd.lock
UNIX {
Path %(conf_path)s/%(uuid)s/conntrackd.ctl
LockFile {conf_path}/{uuid}/conntrackd.lock
UNIX {{
Path {conf_path}/{uuid}/conntrackd.ctl
Backlog 20
}
}}
SocketBufferSize 262142
SocketBufferSizeMaxGrown 655355
Filter From Kernelspace {
Protocol Accept {
Filter From Kernelspace {{
Protocol Accept {{
TCP
SCTP
DCCP
UDP
ICMP
IPv6-ICMP
}
Address Ignore {
}}
Address Ignore {{
IPv4_address 127.0.0.1
IPv6_address ::1
IPv4_address 192.168.0.5
}
}
}
Sync {
Mode FTFW {
}
Multicast Default {
}}
}}
}}
Sync {{
Mode FTFW {{
}}
Multicast Default {{
IPv4_address 225.0.0.50
IPv4_interface 192.168.0.5
Group 3783
@ -66,10 +66,8 @@ class ConntrackdConfigTestCase(BasicRouterOperationsFramework):
SndSocketBuffer 24985600
RcvSocketBuffer 24985600
Checksum on
}
}""" % {'conf_path': ha_confs_path,
'uuid': FAKE_ID,
})
}}
}}""".format(conf_path=ha_confs_path, uuid=FAKE_ID))
def get_manager(self):
return conntrackd.ConntrackdManager(

View File

@ -58,7 +58,7 @@ class FakeDNSAssignment:
self.ip_address = ip_address
self.fqdn = self.hostname
if domain:
self.fqdn = '{}.{}.'.format(self.hostname, domain)
self.fqdn = f'{self.hostname}.{domain}.'
class DhcpOpt:
@ -1180,7 +1180,7 @@ class TestDhcpBase(TestBase):
self.called.append('enable')
def disable(self, retain_port=False, block=False):
self.called.append('disable {} {}'.format(retain_port, block))
self.called.append(f'disable {retain_port} {block}')
def reload_allocations(self):
pass
@ -1449,7 +1449,7 @@ class TestDnsmasq(TestBase):
has_stateless=True, dhcp_t1=0, dhcp_t2=0,
bridged=True):
def mock_get_conf_file_name(kind):
return '/dhcp/{}/{}'.format(network.id, kind)
return f'/dhcp/{network.id}/{kind}'
# Empty string passed to --conf-file in dnsmasq is invalid
# we must force '' to '/dev/null' because the dhcp agent
@ -1692,7 +1692,7 @@ class TestDnsmasq(TestBase):
'00:00:80:aa:bb:cc 192.168.0.2 * *',
'00:00:0f:aa:bb:cc 192.168.0.3 * *',
'00:00:0f:rr:rr:rr 192.168.0.1 * *\n']
expected = "\n".join(['{} {}'.format(timestamp, le)
expected = "\n".join([f'{timestamp} {le}'
for le in expected])
with mock.patch.object(dhcp.Dnsmasq, 'get_conf_file_name') as conf_fn:
conf_fn.return_value = '/foo/leases'
@ -3620,7 +3620,7 @@ class TestDictModel(base.BaseTestCase):
self.assertIsNone(self.dm.get('a'))
def test__str(self):
reference = 'a={}, b={}'.format(self._a, self._b)
reference = f'a={self._a}, b={self._b}'
self.assertEqual(reference, str(self.dm))
def test__getitem(self):

View File

@ -950,7 +950,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
self.netns_cmd.execute(['ip', 'link', 'list'], env)
execute.assert_called_once_with(
['ip', 'netns', 'exec', 'ns', 'env'] +
['{}={}'.format(k, v) for k, v in env.items()] +
[f'{k}={v}' for k, v in env.items()] +
['ip', 'link', 'list'],
run_as_root=True, check_exit_code=True, extra_ok_codes=None,
log_fail_as_error=True, privsep_exec=False)

View File

@ -67,7 +67,7 @@ class BaseIpsetManagerTest(base.BaseTestCase):
def expect_set(self, addresses):
temp_input = ['create %s hash:net family inet' % TEST_SET_NAME_NEW]
temp_input.extend('add {} {}'.format(TEST_SET_NAME_NEW, ip)
temp_input.extend(f'add {TEST_SET_NAME_NEW} {ip}'
for ip in self.ipset._sanitize_addresses(addresses))
input = '\n'.join(temp_input)
self.expected_calls.extend([

View File

@ -254,7 +254,7 @@ class TestMetadataAgent(base.BaseTestCase):
self._test__process_cidrs_when_current_namespace_empty(False)
def _test__process_cidrs_current_ns_only_contains_meta_cidr(self,
ipv6_enabled):
ipv6_enabled):
"""Current namespace cidrs only contains IPv4 metadata cidr,
and it is missing new required cidrs.
"""
@ -290,7 +290,7 @@ class TestMetadataAgent(base.BaseTestCase):
self._test__process_cidrs_current_ns_only_contains_meta_cidr(False)
def _test__process_cidrs_current_ns_contains_stale_cidr(self,
ipv6_enabled):
ipv6_enabled):
"""Current namespace cidrs contains stale cidrs, including lla,
and it is missing new required cidrs.
"""

View File

@ -80,7 +80,7 @@ def _hex_str(num):
def _hex_format(port, mask):
if mask != 0xffff:
return "{}/{}".format(_hex_str(port), _hex_str(0xffff & mask))
return f"{_hex_str(port)}/{_hex_str(0xffff & mask)}"
return _hex_str(port)

View File

@ -771,7 +771,7 @@ class ProjectManagerTests(AdminTests):
self.context, 'delete_subnet:tags', self.alt_target)
self.assertTrue(
policy.enforce(self.context, 'delete_subnet:tags',
self.alt_target_own_net))
self.alt_target_own_net))
class ProjectMemberTests(ProjectManagerTests):

Some files were not shown because too many files have changed in this diff Show More