Merge "trivial: Autofix low-hanging linter failures"

This commit is contained in:
Zuul
2025-07-15 18:41:37 +00:00
committed by Gerrit Code Review
44 changed files with 141 additions and 108 deletions

View File

@@ -37,7 +37,7 @@ repos:
rev: v0.12.1
hooks:
- id: ruff-check
args: []
args: ['--fix', '--unsafe-fixes']
- repo: https://opendev.org/openstack/hacking
rev: 7.0.0
hooks:

View File

@@ -66,7 +66,7 @@ def is_agent_down(heart_beat_time):
cfg.CONF.agent_down_time)
class _SocketWrapper():
class _SocketWrapper:
"""Determines if socket module is patched by eventlet
and unpatches it.

View File

@@ -26,7 +26,7 @@ METADATA_V4_IP = constants.METADATA_V4_IP
METADATA_V4_PORT = constants.METADATA_PORT
class MetadataDataPathFlows():
class MetadataDataPathFlows:
def set_path_br(self, path_br):
self.path_br = path_br

View File

@@ -56,7 +56,7 @@ class FailedToInitMetadataPathExtension(n_exc.NeutronException):
"metadata path, error: %(msg)s")
class MetadataPathExtensionPortInfoAPI():
class MetadataPathExtensionPortInfoAPI:
def __init__(self, cache_api):
self.cache_api = cache_api

View File

@@ -37,7 +37,7 @@ class L3AgentExtensionsManager(agent_ext_manager.AgentExtensionsManager):
super().__init__(conf, L3_AGENT_EXT_MANAGER_NAMESPACE)
extensions = []
for extension in self:
if not isinstance(extension.obj, (l3_extension.L3AgentExtension,)):
if not isinstance(extension.obj, l3_extension.L3AgentExtension):
extensions.append(extension.attr)
if extensions:
raise l3_exc.L3ExtensionException(extensions=extensions)

View File

@@ -95,7 +95,7 @@ class DictModel(collections.abc.MutableMapping):
return item
for key, value in itertools.chain(temp_dict.items(), kwargs.items()):
if isinstance(value, (list, tuple)):
if isinstance(value, list | tuple):
# Keep the same type but convert dicts to DictModels
self._dictmodel_internal_storage[key] = type(value)(
upgrade(item) for item in value

View File

@@ -130,7 +130,7 @@ class RemoteResourceCache:
for key, values in filters.items():
for value in values:
attr = getattr(obj, key)
if isinstance(attr, (list, tuple, set)):
if isinstance(attr, list | tuple | set):
# attribute is a list so we check if value is in
# list
if value in attr:

View File

@@ -586,8 +586,8 @@ def min_tx_rate_support():
ip_link = ip_lib.IpLinkCommand(device)
# NOTE(ralonsoh): to set min_tx_rate, first is needed to set
# max_tx_rate and max_tx_rate >= min_tx_rate.
vf_config = {'vf': VF_NUM, 'rate': {'min_tx_rate': int(400),
'max_tx_rate': int(500)}}
vf_config = {'vf': VF_NUM, 'rate': {'min_tx_rate': 400,
'max_tx_rate': 500}}
ip_link.set_vf_feature(vf_config)
vf_config = {'vf': VF_NUM, 'rate': {'min_tx_rate': 0,
'max_tx_rate': 0}}

View File

@@ -230,7 +230,7 @@ class NeutronObject(obj_base.VersionedObject,
def is_object_field(cls, field):
return isinstance(
cls.fields[field],
(obj_fields.ListOfObjectsField, obj_fields.ObjectField))
obj_fields.ListOfObjectsField | obj_fields.ObjectField)
@classmethod
def obj_class_from_name(cls, objname, objver):

View File

@@ -20,7 +20,7 @@ import ovs.ovsuuid
# Temporarily fix ovs.db.idl.Transaction._substitute_uuids to support handling
# the persist_uuid feature
def _substitute_uuids(self, json):
if isinstance(json, (list, tuple)):
if isinstance(json, list | tuple):
if (len(json) == 2 and
json[0] == 'uuid' and
ovs.ovsuuid.is_valid_string(json[1])):

View File

@@ -760,7 +760,7 @@ def _make_pyroute2_route_args(namespace, ip_version, cidr, device, via, table,
if isinstance(protocol, str) and protocol in rtnl.rt_proto:
protocol = rtnl.rt_proto[protocol]
args['proto'] = protocol
if isinstance(via, (list, tuple)):
if isinstance(via, list | tuple):
args['multipath'] = []
for mp in via:
multipath = {}

View File

@@ -519,12 +519,12 @@ class QoSPlugin(qos.QoSPluginBase):
# Filter out rules that can't have resources allocated in Placement
original_rules = [
r for r in original_rules
if (isinstance(r, (rule_object.QosMinimumBandwidthRule,
rule_object.QosMinimumPacketRateRule)))]
if (isinstance(r, rule_object.QosMinimumBandwidthRule |
rule_object.QosMinimumPacketRateRule))]
desired_rules = [
r for r in desired_rules
if (isinstance(r, (rule_object.QosMinimumBandwidthRule,
rule_object.QosMinimumPacketRateRule)))]
if (isinstance(r, rule_object.QosMinimumBandwidthRule |
rule_object.QosMinimumPacketRateRule))]
if not original_rules and not desired_rules:
return

View File

@@ -420,7 +420,8 @@ class Pinger:
self.interval = interval
def _wait_for_death(self):
is_dead = lambda: self.proc.poll() is not None
def is_dead():
return self.proc.poll() is not None
common_utils.wait_until_true(
is_dead, timeout=self.TIMEOUT, exception=RuntimeError(
"Ping command hasn't ended after %d seconds." % self.TIMEOUT))

View File

@@ -147,7 +147,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
'DhcpAgentNotifyAPI').start()
# Update the plugin
self.setup_coreplugin(plugin, load_plugins=False)
if isinstance(service_plugins, (list, tuple)):
if isinstance(service_plugins, list | tuple):
# Sometimes we needs these test service_plugins to be ordered.
cfg.CONF.set_override('service_plugins', service_plugins)
else:
@@ -402,7 +402,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
"json", 200, subnet['network_id']).json['ports']
used_ips = set()
if exclude:
if isinstance(exclude, (list, set, tuple)):
if isinstance(exclude, list | set | tuple):
used_ips = set(exclude)
else:
used_ips.add(exclude)
@@ -5463,11 +5463,11 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as v2:
subnets = (v1, v2)
query_params = ('project_id={0}'.
query_params = ('project_id={}'.
format(network['network']['project_id']))
self._test_list_resources('subnet', subnets,
query_params=query_params)
query_params = ('project_id={0}'.
query_params = ('project_id={}'.
format(uuidutils.generate_uuid()))
self._test_list_resources('subnet', [],
query_params=query_params)

View File

@@ -30,7 +30,7 @@ metadata_opts = [
cfg.CONF.register_opts(metadata_opts)
class FakeMetadata():
class FakeMetadata:
def wsgi_app(self, env, start_response):
response_headers = [('Content-Type', 'application/json')]

View File

@@ -557,8 +557,9 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
self.assertFalse(router.iptables_manager.apply())
def _assert_metadata_chains(self, router):
metadata_port_filter = lambda rule: (
str(self.agent.conf.metadata_port) in rule.rule)
def metadata_port_filter(rule):
return (str(self.agent.conf.metadata_port) in rule.rule)
self.assertTrue(self._get_rule(router.iptables_manager,
'nat',
'PREROUTING',
@@ -778,7 +779,7 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
for route in updated_route]
for entry in routes_actual:
if entry['via']:
if isinstance(entry['via'], (list, tuple)):
if isinstance(entry['via'], list | tuple):
via_list = [{'via': hop['via']}
for hop in entry['via']]
entry['via'] = sorted(via_list, key=lambda i: i['via'])

View File

@@ -936,11 +936,13 @@ class IpRouteCommandTestCase(functional_base.BaseSudoTestCase):
not_in=False):
routes = self.device.route.list_routes(ip_version, table=table)
if not_in:
fn = lambda: cmp not in routes
def fn():
return cmp not in routes
msg = 'Route found: %s\nRoutes present: {routes}'.format(
routes=routes)
else:
fn = lambda: cmp in routes
def fn():
return cmp in routes
msg = 'Route not found: %s\nRoutes present: {routes}'.format(
routes=routes)

View File

@@ -229,8 +229,9 @@ class DHCPAgentOVSTestFramework(base.BaseSudoTestCase):
self._run_dhclient(vif_name, network)
predicate = lambda: len(
self._ip_list_for_vif(vif_name, network.namespace))
def predicate():
return len(self._ip_list_for_vif(vif_name, network.namespace))
common_utils.wait_until_true(predicate, 10)
ip_list = self._ip_list_for_vif(vif_name, network.namespace)

View File

@@ -96,11 +96,17 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.addOnException(self.collect_flows_and_ports)
def collect_flows_and_ports(self, exc_info):
nicevif = lambda x: [f'{k}={getattr(x, k)}'
for k in ['ofport', 'port_name', 'switch',
'vif_id', 'vif_mac']]
nicedev = lambda x: [f'{k}={getattr(x, k)}'
for k in ['name', 'namespace']] + x.addr.list()
def nicevif(x):
return [
f'{k}={getattr(x, k)}'
for k in ['ofport', 'port_name', 'switch', 'vif_id', 'vif_mac']
]
def nicedev(x):
return [
f'{k}={getattr(x, k)}' for k in ['name', 'namespace']
] + x.addr.list()
details = {'flows': self.br.dump_all_flows(),
'vifs': map(nicevif, self.br.get_vif_ports()),
'src_ip': self.src_addr,

View File

@@ -312,7 +312,8 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
# bond ports don't have records in the Interface table but they do in
# the Port table
orig = self.br.get_port_name_list
new_port_name_list = lambda: orig() + ['bondport']
def new_port_name_list():
return orig() + ['bondport']
mock.patch.object(self.br, 'get_port_name_list',
new=new_port_name_list).start()
ports = self.br.get_vif_ports()
@@ -335,7 +336,8 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
# return an extra port to make sure the db list ignores it
orig = self.br.get_port_name_list
new_port_name_list = lambda: orig() + ['anotherport']
def new_port_name_list():
return orig() + ['anotherport']
mock.patch.object(self.br, 'get_port_name_list',
new=new_port_name_list).start()
ports = self.br.get_vif_port_set()

View File

@@ -968,8 +968,8 @@ class TestMaintenance(_TestMaintenanceHelper):
self.assertEqual(subnet1['cidr'], snat_rule['logical_ip'])
def test_port_forwarding(self):
fip_attrs = lambda args: {
pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: args}}
def fip_attrs(args):
return {pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: args}}
def _verify_lb(test, protocol, vip_ext_port, vip_int_port):
ovn_lbs = self._find_pf_lb(router_id, fip_id)

View File

@@ -127,7 +127,8 @@ class TestNeutronServer(base.BaseLoggingTestCase,
raise exception
# Wait at most 10 seconds to spawn workers
condition = lambda: self.workers == len(get_workers_pid())
def condition():
return self.workers == len(get_workers_pid())
utils.wait_until_true(condition, timeout=timeout, sleep=0.5,
exception=exception)
return get_workers_pid()

View File

@@ -517,7 +517,7 @@ class OVS_Lib_Test(base.BaseTestCase):
ovs_row = []
r["data"].append(ovs_row)
for cell in row:
if isinstance(cell, (str, int, list)):
if isinstance(cell, str | int | list):
ovs_row.append(cell)
elif isinstance(cell, dict):
ovs_row.append(["map", cell.items()])

View File

@@ -68,7 +68,8 @@ class TestHostMedataHAProxyDaemonMonitor(base.BaseTestCase):
host_meta.config(instance_infos)
host_meta.enable()
cmd = execute.call_args[0][0]
_join = lambda *args: ' '.join(args)
def _join(*args):
return ' '.join(args)
cmd = _join(*cmd)
self.assertIn('haproxy', cmd)
self.assertIn(_join('-f', conffile), cmd)

View File

@@ -3438,7 +3438,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
mock_delete.assert_called_once_with(pidfile, run_as_root=True)
mock_delete.reset_mock()
cmd = execute.call_args[0][0]
_join = lambda *args: ' '.join(args)
def _join(*args):
return ' '.join(args)
cmd = _join(*cmd)
self.assertIn('radvd', cmd)
self.assertIn(_join('-C', conffile), cmd)

View File

@@ -51,7 +51,8 @@ class BridgeLibTest(base.BaseTestCase):
self.execute.reset_mock()
def test_is_bridged_interface(self):
exists = lambda path: path == "/sys/class/net/tapOK/brport"
def exists(path):
return path == "/sys/class/net/tapOK/brport"
with mock.patch('os.path.exists', side_effect=exists):
self.assertTrue(bridge_lib.is_bridged_interface("tapOK"))
self.assertFalse(bridge_lib.is_bridged_interface("tapKO"))
@@ -72,7 +73,8 @@ class BridgeLibTest(base.BaseTestCase):
def test_owns_interface(self):
br = bridge_lib.BridgeDevice('br-int')
exists = lambda path: path == "/sys/class/net/br-int/brif/abc"
def exists(path):
return path == "/sys/class/net/br-int/brif/abc"
with mock.patch('os.path.exists', side_effect=exists):
self.assertTrue(br.owns_interface("abc"))
self.assertFalse(br.owns_interface("def"))

View File

@@ -103,7 +103,8 @@ class BaseIptablesFirewallTestCase(base.BaseTestCase):
# initial data has 1, 2, and 9 in use, see RAW_TABLE_OUTPUT above.
self._dev_zone_map = {'61634509-31': 4098, '8f46cf18-12': 4105,
'95c24827-02': 4098, 'e804433b-61': 4097}
get_rules_for_table_func = lambda x: RAW_TABLE_OUTPUT.split('\n')
def get_rules_for_table_func(x):
return RAW_TABLE_OUTPUT.split('\n')
filtered_ports = {port_id: self._fake_port()
for port_id in self._dev_zone_map}
self.firewall.ipconntrack = ip_conntrack.IpConntrackManager(

View File

@@ -305,7 +305,7 @@ class TestGetCmdlineFromPid(base.BaseTestCase):
mock_open = self.useFixture(
lib_fixtures.OpenFixture('/proc/%s/cmdline' % self.pid, 'process')
).mock_open
mock_open.side_effect = IOError()
mock_open.side_effect = OSError()
cmdline = utils.get_cmdline_from_pid(self.pid)
mock_open.assert_called_once_with('/proc/%s/cmdline' % self.pid)
self.assertEqual([], cmdline)

View File

@@ -58,8 +58,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test__flood_cache_for_query_pulls_once(self):
resources = [OVOLikeThing(66), OVOLikeThing(67)]
received_kw = []
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
self._pullmock.bulk_pull.side_effect = [
@@ -117,7 +117,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
OVOLikeThing(4, size='xlarge'), OVOLikeThing(6, size='small')]
for goose in geese:
self.rcache.record_resource_update(self.ctx, 'goose', goose)
has_large = lambda o: 'large' in o.size
def has_large(o):
return 'large' in o.size
self.assertCountEqual([geese[0], geese[2]],
self.rcache.match_resources_with_func('goose',
has_large))
@@ -138,8 +139,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_update(self):
received_kw = []
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
self.rcache.record_resource_update(self.ctx, 'goose',
OVOLikeThing(3, size='large'))
@@ -161,8 +162,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_delete(self):
received_kw = []
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
self.rcache.record_resource_update(self.ctx, 'goose',
OVOLikeThing(3, size='large'))
@@ -178,8 +179,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_delete_ignores_dups(self):
received_kw = []
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
self.rcache.record_resource_delete(self.ctx, 'goose', 3)
self.assertEqual(1, len(received_kw))

View File

@@ -2852,7 +2852,8 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
PHYSDEV_EGRESS = 'physdev-in'
def setUp(self, defer_refresh_firewall=False):
clear_mgrs = lambda: ip_conntrack.CONTRACK_MGRS.clear()
def clear_mgrs():
return ip_conntrack.CONTRACK_MGRS.clear()
self.addCleanup(clear_mgrs)
clear_mgrs() # clear before start in case other tests didn't clean up
super().setUp()

View File

@@ -23,13 +23,15 @@ class ConsumerRegistryTestCase(base.BaseTestCase):
@mock.patch.object(registry, '_get_manager')
def test_register(self, manager_mock):
callback = lambda: None
def callback():
return None
registry.register(callback, 'TYPE')
manager_mock().register.assert_called_with(callback, 'TYPE')
@mock.patch.object(registry, '_get_manager')
def test_unsubscribe(self, manager_mock):
callback = lambda: None
def callback():
return None
registry.unsubscribe(callback, 'TYPE')
manager_mock().unregister.assert_called_with(callback, 'TYPE')

View File

@@ -49,14 +49,16 @@ class ResourceCallbacksManagerTestCaseMixin:
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_unregister_unregisters_callback(self, *mocks):
callback = lambda: None
def callback():
return None
self.mgr.register(callback, 'TYPE')
self.mgr.unregister(callback, 'TYPE')
self.assertEqual([], self.mgr.get_subscribed_types())
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test___init___does_not_reset_callbacks(self, *mocks):
callback = lambda: None
def callback():
return None
self.mgr.register(callback, 'TYPE')
resource_manager.ProducerResourceCallbacksManager()
self.assertEqual(['TYPE'], self.mgr.get_subscribed_types())
@@ -71,7 +73,8 @@ class ProducerResourceCallbacksManagerTestCase(
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_register_registers_callback(self, *mocks):
callback = lambda: None
def callback():
return None
self.mgr.register(callback, 'TYPE')
self.assertEqual(callback, self.mgr.get_callback('TYPE'))
@@ -96,8 +99,10 @@ class ProducerResourceCallbacksManagerTestCase(
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_get_callback_returns_proper_callback(self, *mocks):
callback1 = lambda: None
callback2 = lambda: None
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE1')
self.mgr.register(callback2, 'TYPE2')
self.assertEqual(callback1, self.mgr.get_callback('TYPE1'))
@@ -113,14 +118,17 @@ class ConsumerResourceCallbacksManagerTestCase(
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_register_registers_callback(self, *mocks):
callback = lambda: None
def callback():
return None
self.mgr.register(callback, 'TYPE')
self.assertEqual({callback}, self.mgr.get_callbacks('TYPE'))
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_register_succeeds_on_multiple_calls(self, *mocks):
callback1 = lambda: None
callback2 = lambda: None
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE')
self.mgr.register(callback2, 'TYPE')
@@ -132,8 +140,10 @@ class ConsumerResourceCallbacksManagerTestCase(
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_get_callbacks_returns_proper_callbacks(self, *mocks):
callback1 = lambda: None
callback2 = lambda: None
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE1')
self.mgr.register(callback2, 'TYPE2')
self.assertEqual({callback1}, self.mgr.get_callbacks('TYPE1'))

View File

@@ -583,13 +583,13 @@ class SpawnWithOrWithoutProfilerTestCase(base.BaseTestCase):
@utils.SingletonDecorator
class _TestSingletonClass1():
class _TestSingletonClass1:
def __init__(self, variable):
self.variable = variable
@utils.SingletonDecorator
class _TestSingletonClass2():
class _TestSingletonClass2:
def __init__(self, variable):
self.variable = variable

View File

@@ -4257,8 +4257,8 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
events.AFTER_DELETE)
def test_router_create_precommit_event(self):
nset = lambda r, e, t, payload: \
setattr(payload.metadata['router_db'], 'name', 'hello')
def nset(r, e, t, payload):
return setattr(payload.metadata['router_db'], 'name', 'hello')
registry.subscribe(nset, resources.ROUTER, events.PRECOMMIT_CREATE)
with self.router() as r:
self.assertEqual('hello', r['router']['name'])
@@ -4303,8 +4303,8 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
def test_router_delete_precommit_event(self):
deleted = []
auditor = lambda r, e, t, payload: \
deleted.append(payload.resource_id)
def auditor(r, e, t, payload):
return deleted.append(payload.resource_id)
registry.subscribe(auditor, resources.ROUTER, events.PRECOMMIT_DELETE)
with self.router() as r:
self._delete('routers', r['router']['id'])

View File

@@ -30,5 +30,6 @@ class DataPlaneStatusDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
def setUp(self):
super().setUp()
net = self._create_test_network()
getter = lambda: self._create_test_port(network_id=net.id).id
def getter():
return self._create_test_port(network_id=net.id).id
self.update_obj_fields({'port_id': getter})

View File

@@ -118,7 +118,8 @@ class PortBindingVifDetailsTestCase(testscenarios.WithScenarios,
def setUp(self):
super().setUp()
self._create_test_network()
getter = lambda: self._create_port(network_id=self._network['id']).id
def getter():
return self._create_port(network_id=self._network['id']).id
self.update_obj_fields({'port_id': getter})
def _create_port(self, **port_attrs):

View File

@@ -69,9 +69,11 @@ class TestExtensionDriver(TestExtensionDriverBase):
assert(isinstance(result, dict))
assert(result['id'] is not None)
if expected_obj_entry_class:
assert(isinstance(entry,
(expected_db_entry_class,
expected_obj_entry_class)))
assert(
isinstance(
entry, expected_db_entry_class | expected_obj_entry_class
)
)
else:
assert(isinstance(entry, expected_db_entry_class))
assert(entry.id == result['id'])

View File

@@ -744,7 +744,7 @@ class TestPciOsWrapper(base.BaseTestCase):
esm.PciOsWrapper.NUMVFS_PATH % 'dev1')
def test_get_numvfs_no_file(self):
with mock.patch("builtins.open", side_effect=IOError()) as mock_open:
with mock.patch("builtins.open", side_effect=OSError()) as mock_open:
self.assertEqual(-1, esm.PciOsWrapper.get_numvfs('dev1'))
mock_open.assert_called_once_with(
esm.PciOsWrapper.NUMVFS_PATH % 'dev1')

View File

@@ -32,7 +32,8 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase):
self.plugin = directory.get_plugin()
self.ctx = context.get_admin_context()
self.received = []
receive = lambda s, ctx, obs, evt: self.received.append((obs[0], evt))
def receive(s, ctx, obs, evt):
return self.received.append((obs[0], evt))
mock.patch('neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPushRpcApi.push', new=receive).start()
# base case blocks the handler

View File

@@ -1185,9 +1185,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_update_outside_transaction(self):
self.tx_open = True
receive = lambda r, e, t, payload: \
setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
def receive(r, e, t, payload):
return setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_UPDATE)
@@ -1197,9 +1197,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_delete_outside_transaction(self):
self.tx_open = True
receive = lambda r, e, t, payload: \
setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
def receive(r, e, t, payload):
return setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_DELETE)
@@ -1782,9 +1782,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
ctx = context.get_admin_context()
b_update_events = []
a_update_events = []
b_receiver = lambda r, e, t, payload: b_update_events.append(payload)
a_receiver = lambda r, e, t, payload: \
a_update_events.append(payload.latest_state)
def b_receiver(r, e, t, payload):
return b_update_events.append(payload)
def a_receiver(r, e, t, payload):
return a_update_events.append(payload.latest_state)
registry.subscribe(b_receiver, resources.PORT,
events.BEFORE_UPDATE)
@@ -2047,8 +2048,8 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
**host_arg) as port:
port = plugin.get_port(ctx, port['port']['id'])
updated_ports = []
receiver = lambda r, e, t, payload: \
updated_ports.append(payload.latest_state)
def receiver(r, e, t, payload):
return updated_ports.append(payload.latest_state)
registry.subscribe(receiver, resources.PORT,
events.AFTER_UPDATE)
@@ -2061,8 +2062,8 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
def test_bind_port_bumps_revision(self):
updated_ports = []
created_ports = []
ureceiver = lambda r, e, t, payload: \
updated_ports.append(payload.latest_state)
def ureceiver(r, e, t, payload):
return updated_ports.append(payload.latest_state)
def creceiver(r, e, t, payload=None):
created_ports.append(payload.latest_state)

View File

@@ -41,8 +41,9 @@ class TestOVNDriverBase(base.BaseTestCase):
self.plugin_driver.nb_ovn = fake_resources.FakeOvsdbNbOvnIdl()
self.log_plugin = mock.Mock()
get_mock_log_plugin = lambda alias: self.log_plugin if (
alias == plugin_constants.LOG_API) else None
def get_mock_log_plugin(alias):
return self.log_plugin if (
alias == plugin_constants.LOG_API) else None
self.fake_get_dir_object = mock.patch(
"neutron_lib.plugins.directory.get_plugin",
side_effect=get_mock_log_plugin).start()
@@ -119,7 +120,7 @@ class TestOVNDriver(TestOVNDriverBase):
driver2 = self._log_driver_reinit()
self.assertEqual(test_log_base, driver2.meter_name)
class _fake_acl():
class _fake_acl:
def __init__(self, name=None, **acl_dict):
acl_defaults_dict = {
"name": [name] if name else [],

View File

@@ -51,7 +51,7 @@ from neutron.tests.unit import fake_resources
from neutron.tests.unit.plugins.ml2 import test_plugin as test_mech_driver
class BaseTestOVNL3RouterPluginMixin():
class BaseTestOVNL3RouterPluginMixin:
_mechanism_drivers = ['ovn']
l3_plugin = 'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin'

View File

@@ -499,8 +499,9 @@ class TestOVNPortForwarding(TestOVNPortForwardingBase):
ovn_conf.register_opts()
self.pf_plugin = mock.Mock()
self.handler = mock.Mock()
get_mock_pf_plugin = lambda alias: self.pf_plugin if (
alias == plugin_constants.PORTFORWARDING) else None
def get_mock_pf_plugin(alias):
return self.pf_plugin if (
alias == plugin_constants.PORTFORWARDING) else None
self.fake_get_dir_object = mock.patch(
"neutron_lib.plugins.directory.get_plugin",
side_effect=get_mock_pf_plugin).start()

View File

@@ -25,18 +25,10 @@ target-version = "py310"
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "S", "U"]
ignore = [
"E731", # Do not assign a `lambda` expression, use a `def`
"E741", # Ambiguous variable name: `l`
"S104", # Possible binding to all interfaces
"UP031", # Use format specifiers instead of percent format
"UP032", # Use f-string instead of `format` call
# FIXME(stephenfin): These can all be auto-fixed by ruff and
# should be enabled
"UP018", # Unnecessary `int` call (rewrite as a literal)
"UP024", # Replace aliased errors with `OSError`
"UP030", # Use implicit references for positional format fields
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
"UP039", # Unnecessary parentheses after class definition
]
[tool.ruff.lint.per-file-ignores]