Re-enable flake8

This is a partial revert of be77ed3a81. It
turns out many of the ruff rules - particularly the pycodestyle "E"
rules - are only enabled in preview mode [1]. We don't normally see this
since 'ruff format' makes these rules unnecessary. Re-enable flake8
until we decide to enable 'ruff format' (if we decide to enable it).

We run `autopep8` to fix the errors that have been introduced since that
commit and follow-up with manual fixups to get us back on the straight
and narrow.

[1] https://docs.astral.sh/ruff/rules/#error-e

Change-Id: If203af5bad388307e828d1190cfc08d335bc36a1
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2026-03-20 15:10:12 +01:00
parent b5f43626c6
commit beb94c0d22
106 changed files with 196 additions and 87 deletions
+1
View File
@@ -19,6 +19,7 @@ from oslo_messaging import Target
class BasePluginApi:
"""Base agent side of the rpc API"""
def __init__(self, topic, namespace, version):
target = Target(
topic=topic,
@@ -30,6 +30,7 @@ class ResourceUpdate:
Priority values are ordered from higher (0) to lower (>0) by the caller,
and are therefore not defined here, but must be done by the consumer.
"""
def __init__(self, id, priority,
action=None, resource=None, timestamp=None, tries=5):
self.priority = priority
@@ -161,6 +162,7 @@ class ExclusiveResourceProcessor:
class ResourceProcessingQueue:
"""Manager of the queue of resources to process."""
def __init__(self):
self._queue = queue.PriorityQueue()
self._run = True
+1
View File
@@ -996,6 +996,7 @@ class DhcpPluginApi(base_agent_rpc.BasePluginApi):
class NetworkCache:
"""Agent cache of the current network state."""
def __init__(self):
self.cache = {}
self.subnet_lookup = {}
@@ -62,6 +62,7 @@ class FdbPopulationAgentExtension(
"""FDB table tracker is a helper class
intended to keep track of the existing FDB rules.
"""
def __init__(self, devices):
self.device_to_macs = {}
self.portid_to_mac = {}
@@ -235,7 +235,7 @@ class MetadataPathAgentExtension(
raise RuntimeError(
_("Failed to set mac address "
"for dev %(dev)s, error: %(error)s") %
{'dev': self.META_DEV_NAME, 'error': e})
{'dev': self.META_DEV_NAME, 'error': e})
cidr = "{}/{}".format(
self.provider_gateway_ip,
@@ -41,6 +41,7 @@ class FipRulePriorityAllocator(ItemAllocator):
This class provides an allocator which saves the priorities
to a datastore which will survive L3 agent restarts.
"""
def __init__(self, data_store_path, priority_rule_start,
priority_rule_end):
"""Create the necessary pool and create the item allocator
+1
View File
@@ -64,6 +64,7 @@ class HaRouterNamespace(namespaces.RouterNamespace):
It has to be disabled on all other nodes to avoid sending MLD packets
which cause lost connectivity to Floating IPs.
"""
def create(self):
super().create(ipv6_forwarding=False)
# HA router namespaces should have ip_nonlocal_bind enabled
+1
View File
@@ -41,6 +41,7 @@ class LinkLocalAllocator(ItemAllocator):
Persisting these in the database is unnecessary and would degrade
performance.
"""
def __init__(self, data_store_path, subnet):
"""Create the necessary pool and item allocator
using ',' as the delimiter and LinkLocalAllocator as the
+1
View File
@@ -168,6 +168,7 @@ class Daemon:
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin=DEVNULL, stdout=DEVNULL,
stderr=DEVNULL, procname=sys.executable, uuid=None,
user=None, group=None):
+1
View File
@@ -57,6 +57,7 @@ class ProcessManager(MonitoredProcess):
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, namespace=None, service=None,
pids_path=None, default_cmd_callback=None,
cmd_addl_env=None, pid_file=None, run_as_root=False,
+1 -1
View File
@@ -344,7 +344,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
raise RuntimeError(
_("Failed to set mac address to: %(mac)s; "
"Current mac: %(current_mac)s") %
{'mac': mac_address, 'current_mac': current_mac})
{'mac': mac_address, 'current_mac': current_mac})
def _ensure_device_address(self, device, mac_address):
mac_address = converters.convert_to_sanitized_mac_address(mac_address)
+1
View File
@@ -35,6 +35,7 @@ class IpConntrackUpdate:
An instance of this object carries the information necessary to
process a request to update the conntrack table.
"""
def __init__(self, device_info_list, rule, remote_ips):
self.device_info_list = device_info_list
self.rule = rule
@@ -28,6 +28,7 @@ def get_iptables_driver_instance():
class HybridIptablesHelper(
iptables_firewall.OVSHybridIptablesFirewallDriver):
"""Don't remove conntrack when removing iptables rules."""
def _remove_conntrack_entries_from_port_deleted(self, port):
pass
+1
View File
@@ -434,6 +434,7 @@ def read_if_exists(path: str, run_as_root=False) -> str:
class UnixDomainHTTPConnection(httplib.HTTPConnection):
"""Connection class for HTTP over UNIX domain socket."""
def __init__(self, host, port=None, strict=None, timeout=None,
proxy_info=None):
httplib.HTTPConnection.__init__(self, host, port, strict)
+1
View File
@@ -44,6 +44,7 @@ class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
API version history:
1.0 - Initial version.
"""
def __init__(self, topic):
super().__init__(
topic=topic,
@@ -76,6 +76,7 @@ class ChassisPrivateCreateEvent(row_event.RowEvent):
to do a full sync to make sure that we capture all changes while the
connection to OVSDB was down.
"""
def __init__(self, ovn_agent):
self._first_time = True
self.ovn_agent = ovn_agent
@@ -64,6 +64,7 @@ class BGPChassisBridge(Bridge):
The BGP bridge is the provider bridge that connects a chassis to a BGP
physical interface connected to a BGP peer, typically a leaf switch.
"""
def __init__(self, bgp_agent_api, name):
super().__init__(bgp_agent_api, name)
self._requirements = [
@@ -48,6 +48,7 @@ class OVNExtensionEvent(metaclass=abc.ABCMeta):
by the OVN agent (with the "metadata" extension) and this class removed,
keeping only the compatibility with the OVN agent (to be removed in C+2).
"""
def __init__(self, *args, extension_name=None, **kwargs):
if extension_name is None:
raise OVNExtensionEventEmptyExtensionName(
+1
View File
@@ -69,6 +69,7 @@ def _sync_lock(f):
class ChassisPrivateCreateEvent(extension_manager.OVNExtensionEvent,
row_event.RowEvent):
"""Row create event - Chassis name == our_chassis."""
def __init__(self, ovn_agent):
self._first_time = True
events = (self.ROW_CREATE,)
+1
View File
@@ -279,6 +279,7 @@ class ChassisPrivateCreateEvent(row_event.RowEvent):
to do a full sync to make sure that we capture all changes while the
connection to OVSDB was down.
"""
def __init__(self, agent):
self._extension = None
self.first_time = True
+1
View File
@@ -33,6 +33,7 @@ class RemoteResourceCache:
This is currently only compatible with OVO objects that have an ID.
"""
def __init__(self, resource_types):
self.resource_types = resource_types
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
+1
View File
@@ -80,6 +80,7 @@ class PluginReportStateAPI:
information on changing rpc interfaces, see
doc/source/contributor/internals/rpc_api.rst.
"""
def __init__(self, topic):
target = oslo_messaging.Target(topic=topic, version='1.4',
namespace=constants.RPC_NAMESPACE_STATE)
@@ -41,6 +41,7 @@ class SecurityGroupServerRpcApi:
SecurityGroupServerRpcCallback. For more information about changing rpc
interfaces, see doc/source/contributor/internals/rpc_api.rst.
"""
def __init__(self, topic):
target = oslo_messaging.Target(
topic=topic, version='1.0',
@@ -239,6 +240,7 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
from the updates delivered to the push notifications cache rather than
calling the server.
"""
def __init__(self, rcache):
self.rcache = rcache
registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup',
+1
View File
@@ -52,6 +52,7 @@ class PidsInNamespaceException(Exception):
class FakeDhcpPlugin:
"""Fake RPC plugin to bypass any RPC calls."""
def __getattribute__(self, name):
def fake_method(*args):
pass
@@ -196,6 +196,7 @@ def migrate_neutron_dbs_to_ovn(drv):
LOG.info("Database migration from OVS to OVN by plugin %s completed",
drv.name)
def main():
"""Main method for syncing neutron networks and ports with ovn nb db.
+3 -3
View File
@@ -501,7 +501,7 @@ class CoreChecks(base.BaseChecks):
_('The following projects have duplicated HA networks: '
'%(project_ids)s. This is the list of duplicated HA '
'networks: %(network_ids)s') %
{'project_ids': project_ids, 'network_ids': network_ids})
{'project_ids': project_ids, 'network_ids': network_ids})
return upgradecheck.Result(
upgradecheck.Code.SUCCESS,
@@ -606,8 +606,8 @@ class CoreChecks(base.BaseChecks):
'resource and because of that limit creation of new '
'tags for the resources that exceed the limit will '
'not be possible until some of the tags are removed.') %
{'limit': tagging.MAX_TAGS_COUNT})
{'limit': tagging.MAX_TAGS_COUNT})
return upgradecheck.Result(
upgradecheck.Code.SUCCESS,
_('Number of tags for each resource is below the limit of %d. ') %
tagging.MAX_TAGS_COUNT)
tagging.MAX_TAGS_COUNT)
+1 -1
View File
@@ -277,7 +277,7 @@ class MetadataProxyHandlerBaseSocketServer(
title = '400 Bad Request'
msg = (_('Both network %(network)s and router %(router)s '
'defined.') %
{'network': network_id, 'router': router_id})
{'network': network_id, 'router': router_id})
LOG.warning(msg)
elif network_id:
title = '404 Not Found'
+2 -3
View File
@@ -176,7 +176,7 @@ def ovn_context(txn_var_name='txn', idl_var_name='idl'):
raise RuntimeError(
_('Could not find variables %(txn)s and %(idl)s in the method '
'signature') %
{'txn': txn_var_name, 'idl': idl_var_name})
{'txn': txn_var_name, 'idl': idl_var_name})
def retrieve_parameter(param_name, _args, _kwargs):
# Position of the parameter "param_name" in the "args" tuple.
@@ -687,7 +687,7 @@ def get_lrouter_non_gw_routes(ovn_router):
external_ids.get(constants.OVN_ROUTER_IS_EXT_GW, 'false')):
continue
#NOTE(tpsilva): only add Neutron-managed routes
# NOTE(tpsilva): only add Neutron-managed routes
if strutils.bool_from_string(
external_ids.get(constants.OVN_LRSR_EXT_ID_KEY, 'false')):
routes.append({'destination': route.ip_prefix,
@@ -1152,7 +1152,6 @@ def _sync_ha_chassis_group(nb_idl, hcg_info, txn):
"""
def get_priority(ch_name):
nonlocal priority
nonlocal hcg_info
if hcg_info.priority:
return hcg_info.priority[ch_name]
+1
View File
@@ -156,6 +156,7 @@ class exception_logger:
any occurred
"""
def __init__(self, logger=None):
self.logger = logger
@@ -263,7 +263,7 @@ ovn_opts = [
help=_('HA chassis group failover strategy. Determines how '
'aggressively OVN fails over to a new chassis when the '
'current one becomes unavailable. Supported values: %s')
% ','.join(ovn_const.OVN_HA_FAILOVER_CHOICES),
% ','.join(ovn_const.OVN_HA_FAILOVER_CHOICES),
),
cfg.IntOpt('bfd_min_rx',
default=ovn_const.OVN_BFD_MIN_RX,
+2 -2
View File
@@ -317,8 +317,8 @@ class ExtraGatewaysDbOnlyMixin(l3_gwmode_db.L3_NAT_dbonly_mixin):
reason=_('could not match a gateway port attached to '
'network %(net_id)s based on the specified '
'fixed IPs %(fips)s') %
{'net_id': net_id,
'fips': gw_info['external_fixed_ips']})
{'net_id': net_id,
'fips': gw_info['external_fixed_ips']})
nonexistent_port_info.append(gw_info)
return matched_port_ids, part_matched_port_ids, nonexistent_port_info
@@ -135,9 +135,9 @@ def upgrade():
raise Exception(_('The following tables have not been dropped '
'from the Neutron database: %(missed_tables)s.\n'
'List of errors: %(errors)s') %
{'missed_tables': ', '.join(missed_tables),
'errors': '\n'.join(errors)}
)
{'missed_tables': ', '.join(missed_tables),
'errors': '\n'.join(errors)}
)
tables_to_drop = missed_tables
+1
View File
@@ -173,6 +173,7 @@ def network_segments_exist_in_range(context, network_type, physical_network,
return bool(network_obj.NetworkSegment.count_segments(
context, network_type, physical_network, segment_range=segment_range))
def min_max_actual_segments_in_range(context, network_type, physical_network,
segment_range=None):
"""Return the minimum and maximum segmentation IDs used in a network
+4
View File
@@ -39,6 +39,7 @@ class SubnetRequest(metaclass=abc.ABCMeta):
that is common to any type of request. This class shouldn't be
instantiated on its own. Rather, a subclass of this class should be used.
"""
def __init__(self, project_id, subnet_id,
gateway_ip=None, allocation_pools=None,
set_gateway_ip=True):
@@ -193,6 +194,7 @@ class SpecificSubnetRequest(SubnetRequest):
allocation, even overlapping ones. This can be expanded on by future
blueprints.
"""
def __init__(self, project_id, subnet_id, subnet_cidr,
gateway_ip=None, allocation_pools=None,
set_gateway_ip=True):
@@ -230,6 +232,7 @@ class AddressRequest(metaclass=abc.ABCMeta):
class SpecificAddressRequest(AddressRequest):
"""For requesting a specified address from IPAM"""
def __init__(self, address):
"""Initialize SpecificAddressRequest
@@ -246,6 +249,7 @@ class SpecificAddressRequest(AddressRequest):
class BulkAddressRequest(AddressRequest):
"""For requesting a batch of available addresses from IPAM"""
def __init__(self, num_addresses):
"""Initialize BulkAddressRequest
:param num_addresses: The quantity of IP addresses being requested
+1
View File
@@ -112,6 +112,7 @@ class Pager:
This class represents a pager object. It is consumed by get_objects to
specify sorting and pagination criteria.
'''
def __init__(self, sorts=None, limit=None, page_reverse=None, marker=None):
'''Initialize
+2
View File
@@ -27,6 +27,7 @@ LOG = log.getLogger(__name__)
class InstanceSnapshot:
"""Used to avoid holding references to DB objects in PortContext."""
def __init__(self, obj):
self._model_class = obj.__class__
self._identity_key = sqlalchemy.orm.util.identity_key(instance=obj)[1]
@@ -58,6 +59,7 @@ class InstanceSnapshot:
class MechanismDriverContext:
"""MechanismDriver context base class."""
def __init__(self, plugin, plugin_context):
self._plugin = plugin
# This temporarily creates a reference loop, but the
@@ -19,6 +19,7 @@ import abc
class NetworkSegment:
"""Represents a Neutron network segment"""
def __init__(self, network_type, physical_network, segmentation_id,
mtu=None):
self.network_type = network_type
@@ -33,6 +34,7 @@ class CommonAgentManagerRpcCallBackBase(metaclass=abc.ABCMeta):
This class must be inherited by a RPC callback class that is used
in combination with the common agent.
"""
def __init__(self, context, agent, sg_agent):
self.context = context
self.agent = agent
@@ -691,8 +691,8 @@ class OVNMechanismDriver(api.MechanismDriver):
err_msg = (
_("IP addresses '%(ips)s' already used by the '%(dist)s' "
"port(s) in the same network") %
{'ips': ";".join(common_ips),
'dist': const.DEVICE_OWNER_DISTRIBUTED}
{'ips': ";".join(common_ips),
'dist': const.DEVICE_OWNER_DISTRIBUTED}
)
raise n_exc.InvalidInput(error_message=err_msg)
@@ -905,7 +905,7 @@ class UpdateObjectExtIdsCommand(command.BaseCommand, metaclass=abc.ABCMeta):
raise RuntimeError(
_("%(table)s %(record)s does not exist. "
"Cannot update external IDs") %
{'table': self.table, 'record': self.record})
{'table': self.table, 'record': self.record})
for ext_id_key, ext_id_value in self.external_ids.items():
obj.setkey('external_ids', ext_id_key, ext_id_value)
@@ -682,7 +682,7 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
for row in self.db_find_rows(
'DHCP_Options',
('external_ids', '=', {'subnet_id': subnet_id})
).execute(check_error=True):
).execute(check_error=True):
port_id = row.external_ids.get('port_id')
if with_ports and port_id:
ports.append(self._format_dhcp_row(row))
@@ -698,7 +698,7 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
for row in self.db_find_rows(
'DHCP_Options',
('external_ids', '=', {'subnet_id': subnet_id})
).execute(check_error=True):
).execute(check_error=True):
if not row.external_ids.get('port_id'):
ret_opts.append(self._format_dhcp_row(row))
break
@@ -959,9 +959,10 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
net_id = utils.get_neutron_name(ls.name)
physnet = net_physnet[net_id]
if (ovn_const.OVN_NETTYPE_EXT_ID_KEY not in ls.external_ids or
(ovn_const.OVN_PHYSNET_EXT_ID_KEY not in ls.external_ids
and physnet)):
if ovn_const.OVN_NETTYPE_EXT_ID_KEY not in ls.external_ids or (
ovn_const.OVN_PHYSNET_EXT_ID_KEY not in ls.external_ids and
physnet
):
external_ids = {
ovn_const.OVN_NETTYPE_EXT_ID_KEY: net_type[net_id]}
if physnet:
@@ -1193,8 +1194,8 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
txn.add(cmd)
raise periodics.NeverAgain()
# TODO(ralonsoh): to remove in G+4 (2028.1) cycle (2nd next SLURP release)
@has_lock_periodic(
periodic_run_limit=ovn_const.MAINTENANCE_TASK_RETRY_LIMIT,
spacing=ovn_const.MAINTENANCE_ONE_RUN_TASK_SPACING,
@@ -2034,7 +2034,7 @@ class OVNClient:
provider_net = self._plugin.get_network(
context, gw_port['network_id'])
self.set_gateway_mtu(context, provider_net, txn=txn,
router_id=router_id)
router_id=router_id)
if _has_separate_snat_per_subnet(router):
for sid in subnet_ids:
@@ -447,10 +447,10 @@ class OvnNbSynchronizer(db_sync_base.BaseOvnDbSynchronizer):
else:
external_ids = ovn_route.get('external_ids', {})
#NOTE(tpsilva): only add Neutron-managed routes
# NOTE(tpsilva): only add Neutron-managed routes
if (strutils.bool_from_string(
external_ids.get(
ovn_const.OVN_LRSR_EXT_ID_KEY, 'false')) or
ovn_const.OVN_LRSR_EXT_ID_KEY, 'false')) or
strutils.bool_from_string(
external_ids.get(
ovn_const.OVN_ROUTER_IS_EXT_GW, 'false'))):
@@ -536,6 +536,7 @@ class LogicalSwitchPortUpdateUpEvent(row_event.RowEvent):
This happens when the VM goes up.
"""
def __init__(self, driver):
self.driver = driver
table = 'Logical_Switch_Port'
@@ -566,6 +567,7 @@ class LogicalSwitchPortUpdateDownEvent(row_event.RowEvent):
This happens when the VM goes down or the port is disabled.
"""
def __init__(self, driver):
self.driver = driver
table = 'Logical_Switch_Port'
@@ -594,6 +596,7 @@ class LogicalSwitchPortUpdateDownEvent(row_event.RowEvent):
class LogicalSwitchPortUpdateLogicalRouterPortEvent(row_event.RowEvent):
"""Row update event - Logical_Switch_Port, that updates the sibling LRP"""
def __init__(self, driver):
self.driver = driver
table = 'Logical_Switch_Port'
@@ -655,6 +658,7 @@ class PortBindingUpdateVirtualPortsEvent(row_event.RowEvent):
The goal of this event is to catch the events of the virtual ports and
update the hostname in the related "portbinding" register.
"""
def __init__(self, driver):
self.driver = driver
table = 'Port_Binding'
@@ -713,6 +717,7 @@ class FIPAddDeleteEvent(row_event.RowEvent):
This happens when a FIP is created or removed.
"""
def __init__(self, driver):
self.driver = driver
table = 'NAT'
@@ -736,6 +741,7 @@ class HAChassisGroupRouterEvent(row_event.RowEvent):
updated), those routers with a HA_Chassis_Group related should update the
"LR.options.chassis" value.
"""
def __init__(self, driver):
self.driver = driver
table = 'HA_Chassis_Group'
+1 -1
View File
@@ -303,7 +303,7 @@ class TypeManager(stevedore.named.NamedExtensionManager):
driver = self.drivers.get(network_type)
if isinstance(driver.obj, api.TypeDriver):
return driver.obj.allocate_tenant_segment(context.session,
filters)
filters)
return driver.obj.allocate_tenant_segment(context, filters)
def _allocate_project_net_segment(self, context, filters=None):
+1
View File
@@ -266,6 +266,7 @@ class OwnerCheck(policy.Check):
in the latter case it leverages the plugin to load the referenced
resource and perform the check.
"""
def __init__(self, kind, match):
if kind == 'tenant_id':
LOG.warning(
+1
View File
@@ -362,6 +362,7 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler):
If a router is ha router, allocate L3 agents distributed AZs
according to router's az_hints.
"""
def _get_az_hints(self, router):
return utils.get_az_hints(router)
+5
View File
@@ -115,6 +115,7 @@ class _LrAddCommand(nb_cmd.LrAddCommand):
columns in the existing row are the same as the columns we are trying to
set.
"""
def run_idl(self, txn):
try:
self.row_result = self.result = self.api.lookup(
@@ -147,6 +148,7 @@ class _LrpAddCommand(nb_cmd.LrpAddCommand):
columns in the existing row are the same as the columns we are trying to
set.
"""
def __init__(
self, api, router_name, lrp_name, networks=None, **kwargs):
networks = networks or []
@@ -176,6 +178,7 @@ class _HAChassisGroupAddCommand(nb_cmd.HAChassisGroupAddCommand):
if the columns in the existing row are the same as the columns we are
trying to set.
"""
def run_idl(self, txn):
try:
self.row_result = self.result = self.api.lookup(
@@ -192,6 +195,7 @@ class _HAChassisGroupAddCommand(nb_cmd.HAChassisGroupAddCommand):
class _LrPolicyAddCommand(nb_cmd.LrPolicyAddCommand):
"""An idempotent command to add a logical router policy.
"""
def __init__(self, api, router, priority, match, action, output_port,
*args, **kwargs):
@@ -619,6 +623,7 @@ class ReconcileChassisPeerCommand(ovs_cmd.BaseCommand):
rerouted with ECMP to the peer IP, that typically resides on the
neighboring physical switch.
"""
def __init__(
self, api, chassis, network_name):
super().__init__(api)
+2
View File
@@ -30,6 +30,7 @@ class LogicalRouterPortEvent(row_event.RowEvent):
HA_Chassis_Group, matching the Logical_Router Gateway_Chassis.
See LP#2125553.
"""
def __init__(self, driver):
self.driver = driver
self.l3_plugin = directory.get_plugin(constants.L3)
@@ -95,6 +96,7 @@ class LogicalRouterPortGatewayChassisEvent(row_event.RowEvent):
When the Gateway_Chassis list of a Logical_Router_Port changes, it is
needed to update the linked HA_Chassis_Group registers.
"""
def __init__(self, driver):
self.driver = driver
self.l3_plugin = directory.get_plugin(constants.L3)
+3 -3
View File
@@ -485,9 +485,9 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
msg=_("There is a port collision with the %(key)s. The "
"following ranges collide: %(existing)s and "
"%(port)s") %
{'key': port_key,
'existing': existing_port,
'port': port})
{'key': port_key,
'existing': existing_port,
'port': port})
def _check_port_collisions(self, context, floatingip_id, pf_dict,
id=None, internal_port_id=None,
@@ -157,7 +157,6 @@ def get_provider_driver_class(driver, namespace=SERVICE_PROVIDERS):
def parse_service_provider_opt(service_module='neutron', service_type=None):
"""Parse service definition opts and returns result."""
def validate_name(name):
if len(name) > db_const.NAME_FIELD_SIZE:
+2 -2
View File
@@ -520,11 +520,11 @@ class QoSPlugin(qos.QoSPluginBase):
original_rules = [
r for r in original_rules
if (isinstance(r, rule_object.QosMinimumBandwidthRule |
rule_object.QosMinimumPacketRateRule))]
rule_object.QosMinimumPacketRateRule))]
desired_rules = [
r for r in desired_rules
if (isinstance(r, rule_object.QosMinimumBandwidthRule |
rule_object.QosMinimumPacketRateRule))]
rule_object.QosMinimumPacketRateRule))]
if not original_rules and not desired_rules:
return
@@ -61,6 +61,7 @@ class TrunkBridge(ovs_lib.OVSBridge):
A trunk bridge has a name that follows a specific naming convention.
"""
def __init__(self, trunk_id):
name = utils.gen_trunk_br_name(trunk_id)
super().__init__(name)
@@ -49,6 +49,7 @@ class ResourceAllocator:
instances of it may be initialized and used. A pool of resources
is identified solely by the 'resource_name' argument.
"""
def __init__(self, resource_name, allocator_function, validator=None):
"""Initialize a resource allocator.
+1
View File
@@ -213,6 +213,7 @@ class TestTimer:
timeout exception. The goal of this class is to use the SIGALRM event
without affecting the test case timeout counter.
"""
def __init__(self, timeout):
self._timeout = int(timeout)
self._old_handler = None
+4 -1
View File
@@ -482,7 +482,6 @@ class NetcatTester:
def __init__(self, client_namespace, server_namespace, address,
dst_port, protocol, server_address=None, src_port=None):
"""Initialize NetcatTester
Tool for testing connectivity on transport layer using netcat
@@ -731,6 +730,7 @@ class MacvtapFixture(fixtures.Fixture):
:ivar ip_dev: created macvtap
:type ip_dev: IPDevice
"""
def __init__(self, src_dev=None, mode=None, prefix=MACVTAP_PREFIX):
super().__init__()
self.src_dev = src_dev
@@ -831,6 +831,7 @@ class OVSMetaBridgeFixture(fixtures.Fixture):
:ivar bridge: created bridge
:type bridge: OVSBridge
"""
def __init__(self, name):
super().__init__()
self.name = name
@@ -843,6 +844,7 @@ class OVSMetaBridgeFixture(fixtures.Fixture):
class OVSTrunkBridgeFixture(OVSBridgeFixture):
"""This bridge doesn't generate the name."""
def _setUp(self):
ovs = ovs_lib.BaseOVS()
self.bridge = ovs.add_bridge(self.prefix)
@@ -961,6 +963,7 @@ class LinuxBridgeFixture(fixtures.Fixture):
:ivar namespace: created bridge namespace
:type namespace: str
"""
def __init__(self, prefix=BR_PREFIX, namespace=UNDEFINED,
prefix_is_full_name=False):
super().__init__()
@@ -35,7 +35,6 @@ from neutron_lib import fixture
from neutron_lib.plugins import directory
from neutron_lib.services.qos import constants as qos_const
from neutron_lib.utils import helpers
from neutron_lib.utils import net
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_utils import importutils
@@ -2050,9 +2049,10 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
# simulate duplicate mac generation to make sure DBDuplicate is retried
responses = ['12:34:56:78:00:00', '12:34:56:78:00:00',
'12:34:56:78:00:01']
with mock.patch.object(
net, 'random_mac_generator',
return_value=itertools.cycle(responses)) as grand_mac:
with mock.patch(
'neutron_lib.utils.net.random_mac_generator',
return_value=itertools.cycle(responses)
) as grand_mac:
with self.subnet() as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
self.assertEqual('12:34:56:78:00:00',
@@ -6825,6 +6825,7 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase):
class DbModelMixin:
"""DB model tests."""
def test_make_network_dict_outside_engine_facade_manager(self):
mock.patch.object(directory, 'get_plugin').start()
ctx = context.get_admin_context()
@@ -50,6 +50,7 @@ class ConfigFixture(config_fixtures.ConfigFileFixture):
then the dynamic configuration values won't change. The correct usage
is initializing a new instance of the class.
"""
def __init__(self, env_desc, host_desc, temp_dir, base_filename):
super().__init__(
base_filename, config_fixtures.ConfigDict(), temp_dir)
@@ -36,6 +36,7 @@ class EnvironmentDescription:
Does the setup, as a whole, support tunneling? How about l2pop?
"""
def __init__(self, network_type='vxlan', l2_pop=True, qos=False,
mech_drivers='openvswitch',
service_plugins='router', arp_responder=False,
@@ -97,6 +98,7 @@ class HostDescription:
What agents should the host spawn? What mode should each agent operate
under?
"""
def __init__(self, l3_agent=False, dhcp_agent=False,
l2_agent_type=constants.AGENT_TYPE_OVS,
firewall_driver='noop', availability_zone=None,
@@ -73,7 +73,7 @@ class TestOVNNeutronAgentBase(base.TestOVNFunctionalBase):
for extension in ovn_agent.ext_manager:
self.ovs_idl_events += extension.obj.ovs_idl_events
self.ovs_idl_events = [e(ovn_agent) for e in
set(self.ovs_idl_events)]
set(self.ovs_idl_events)]
ovsdb = impl_idl.api_factory()
ovsdb.idl.notify_handler.watch_events(self.ovs_idl_events)
@@ -47,7 +47,7 @@ class BridgeMonitorTestCase(base.BaseSudoTestCase):
try:
common_utils.wait_until_true(
lambda: set(bridges_to_monitor) ==
set(_idl_mon._bridges_added_list),
set(_idl_mon._bridges_added_list),
timeout=10)
except common_utils.WaitTimeout:
self.fail(f'Bridges to monitor: {set(bridges_to_monitor)}, '
@@ -308,7 +308,6 @@ class TestOVSAgent(base.OVSAgentTestFramework):
time.sleep(0.25)
def test_noresync_after_port_gone(self):
'''This will test the scenario where a port is removed after listing
it but before getting vif info about it.
'''
@@ -323,6 +323,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
# bond ports don't have records in the Interface table but they do in
# the Port table
orig = self.br.get_port_name_list
def new_port_name_list():
return orig() + ['bondport']
mock.patch.object(self.br, 'get_port_name_list',
@@ -347,6 +348,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
# return an extra port to make sure the db list ignores it
orig = self.br.get_port_name_list
def new_port_name_list():
return orig() + ['anotherport']
mock.patch.object(self.br, 'get_port_name_list',
+1
View File
@@ -37,6 +37,7 @@ load_tests = testlib_api.module_load_tests
class IpamTestCase(testlib_api.SqlTestCase, testlib_api.MySQLTestCaseMixin):
"""Base class for tests that aim to test ip allocation."""
def setUp(self):
super().setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
@@ -63,7 +63,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
maintenance.DBInconsistenciesPeriodics, 'has_lock',
mock.PropertyMock(return_value=True))
self.mock_has_lock = self._mock_has_lock.start()
self._mock_set_lock =mock.patch.object(
self._mock_set_lock = mock.patch.object(
ovsdb_monitor.BaseOvnIdl, 'set_lock')
self.mock_set_lock = self._mock_set_lock.start()
super().setUp(maintenance_worker=True)
@@ -1402,10 +1402,10 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
'neutron-' + str(router_id), None)
all_lports = getattr(lrouter, 'ports', [])
managed_lports = [
lport for lport in all_lports
if (ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY in
lport.external_ids)
]
lport for lport in all_lports
if (ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY in
lport.external_ids)
]
plugin_lrouter_port_ids = [lport.name.replace('lrp-', '')
for lport in managed_lports]
@@ -1438,10 +1438,10 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
'neutron-' + router_id, None)
all_lports = getattr(lrouter, 'ports', [])
managed_lports = [
lport for lport in all_lports
if (ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY in
lport.external_ids)
]
lport for lport in all_lports
if (ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY in
lport.external_ids)
]
monitor_lrouter_port_ids = [lport.name.replace('lrp-', '')
for lport in managed_lports]
monitor_lport_networks = {
@@ -2082,7 +2082,7 @@ class TestOvnSbSync(base.TestOVNFunctionalBase):
maintenance.DBInconsistenciesPeriodics, 'has_lock',
mock.PropertyMock(return_value=True))
self.mock_has_lock = self._mock_has_lock.start()
self._mock_set_lock =mock.patch.object(
self._mock_set_lock = mock.patch.object(
ovsdb_monitor.BaseOvnIdl, 'set_lock')
self.mock_set_lock = self._mock_set_lock.start()
super().setUp(maintenance_worker=True)
@@ -47,7 +47,7 @@ def _initialize_network_segment_range_support(type_driver, start_time):
class VlanTypeDriverBaseTestCase(testlib_api.MySQLTestCaseMixin,
testlib_api.SqlTestCase):
testlib_api.SqlTestCase):
def setUp(self):
super().setUp()
cfg.CONF.register_opts(common_config.core_opts)
@@ -27,6 +27,7 @@ class SanityTestCase(base.BaseLoggingTestCase):
neutron-sanity-check runs without throwing an exception, as in the case
where someone modifies the API without updating the check script.
"""
def setUp(self):
super().setUp()
# needed for test_dnsmasq_version()
@@ -2001,7 +2001,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self.l3_plugin.l3_rpc_notifier.router_removed_from_agent.\
assert_called_once_with(
mock.ANY, router['id'], HOST1)
mock.ANY, router['id'], HOST1)
def test_router_auto_scheduling(self):
router = self._create_router()
@@ -27,8 +27,8 @@ from neutron.tests.unit.extensions import test_l3
class TestLogicalRouterPortEvent(
base.TestOVNFunctionalBase,
test_l3.L3NatTestCaseMixin):
base.TestOVNFunctionalBase,
test_l3.L3NatTestCaseMixin):
def setUp(self, **kwargs):
super().setUp(**kwargs)
@@ -194,8 +194,8 @@ class TestLogicalRouterPortEvent(
class TestLogicalRouterPortGatewayChassisEvent(
base.TestOVNFunctionalBase,
test_l3.L3NatTestCaseMixin):
base.TestOVNFunctionalBase,
test_l3.L3NatTestCaseMixin):
def setUp(self, **kwargs):
super().setUp(**kwargs)
@@ -426,7 +426,7 @@ class TestRouter(base.TestOVNFunctionalBase):
# we can test how many calls to the northbound idl we are using in
# in order to create the new routers
with mock.patch.object(ovn_client._nb_idl, 'get_lrouter_port',
wraps=ovn_client._nb_idl.get_lrouter_port) as nb_glrp:
wraps=ovn_client._nb_idl.get_lrouter_port) as nb_glrp:
router = self._create_router('router-multi-gw%d' % i)
self._add_external_gateways(
router['id'],
@@ -39,6 +39,7 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
This suite aims for interaction between events coming from OVSDB monitor,
agent and wiring ports via trunk bridge to integration bridge.
"""
def setUp(self):
"""Prepare resources.
@@ -56,6 +56,7 @@ class StringSetMatcher:
Example: "a,b,45" == "b,45,a"
"""
def __init__(self, string, separator=','):
self.separator = separator
self.set = set(string.split(self.separator))
+1 -1
View File
@@ -735,7 +735,7 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp,
'safe_get_network_info',
return_value=new_fake_network
), \
), \
mock.patch.object(
dhcp, 'update_isolated_metadata_proxy') as ump:
dhcp.configure_dhcp_for_network(fake_network)
@@ -74,6 +74,7 @@ class TestHostMedataHAProxyDaemonMonitor(base.BaseTestCase):
host_meta.config(instance_infos)
host_meta.enable()
cmd = execute.call_args[0][0]
def _join(*args):
return ' '.join(args)
cmd = _join(*cmd)
@@ -3445,6 +3445,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
mock_delete.assert_called_once_with(pidfile, run_as_root=True)
mock_delete.reset_mock()
cmd = execute.call_args[0][0]
def _join(*args):
return ' '.join(args)
cmd = _join(*cmd)
@@ -73,6 +73,7 @@ class BridgeLibTest(base.BaseTestCase):
def test_owns_interface(self):
br = bridge_lib.BridgeDevice('br-int')
def exists(path):
return path == "/sys/class/net/br-int/brif/abc"
with mock.patch('os.path.exists', side_effect=exists):
@@ -104,6 +104,7 @@ class BaseIptablesFirewallTestCase(base.BaseTestCase):
# initial data has 1, 2, and 9 in use, see RAW_TABLE_OUTPUT above.
self._dev_zone_map = {'61634509-31': 4098, '8f46cf18-12': 4105,
'95c24827-02': 4098, 'e804433b-61': 4097}
def get_rules_for_table_func(x):
return RAW_TABLE_OUTPUT.split('\n')
filtered_ports = {port_id: self._fake_port()
@@ -1960,6 +1961,7 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
Copied verbatim from unittest.mock documentation.
"""
def __call__(self, *args, **kwargs):
args = copy.deepcopy(args)
kwargs = copy.deepcopy(kwargs)
@@ -58,6 +58,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test__flood_cache_for_query_pulls_once(self):
resources = [OVOLikeThing(66), OVOLikeThing(67)]
received_kw = []
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
@@ -117,6 +118,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
OVOLikeThing(4, size='xlarge'), OVOLikeThing(6, size='small')]
for goose in geese:
self.rcache.record_resource_update(self.ctx, 'goose', goose)
def has_large(o):
return 'large' in o.size
self.assertCountEqual([geese[0], geese[2]],
@@ -139,6 +141,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_update(self):
received_kw = []
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
@@ -162,6 +165,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_delete(self):
received_kw = []
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
@@ -179,6 +183,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_delete_ignores_dups(self):
received_kw = []
def receiver(r, e, t, payload):
return received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
@@ -116,6 +116,7 @@ class FakeFirewallDriver(firewall_base.FirewallDriver):
FirewallDriver is base class for other types of drivers. To be able to
use it in tests, it's needed to overwrite all abstract methods.
"""
def prepare_port_filter(self, port):
raise NotImplementedError()
@@ -101,6 +101,7 @@ class ProducerResourceCallbacksManagerTestCase(
def test_get_callback_returns_proper_callback(self, *mocks):
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE1')
@@ -127,6 +128,7 @@ class ConsumerResourceCallbacksManagerTestCase(
def test_register_succeeds_on_multiple_calls(self, *mocks):
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE')
@@ -142,6 +144,7 @@ class ConsumerResourceCallbacksManagerTestCase(
def test_get_callbacks_returns_proper_callbacks(self, *mocks):
def callback1():
return None
def callback2():
return None
self.mgr.register(callback1, 'TYPE1')
@@ -610,6 +610,7 @@ class ExtensionManagerTest(base.BaseTestCase):
This Extension doesn't implement extension methods :
get_name, get_description and get_updated
"""
def get_alias(self):
return "invalid_extension"
@@ -824,6 +825,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
This will work with any plugin implementing NeutronPluginBase
"""
def get_plugin_interface(self):
return None
+1
View File
@@ -38,6 +38,7 @@ from neutron.tests.unit import tests
class _PortRange:
"""A linked list of port ranges."""
def __init__(self, base, prev_ref=None):
self.base = base
self.mask = 0xffff
@@ -59,6 +59,7 @@ class TestResource:
class TestTrackedResource(resource.TrackedResource):
"""Describes a test tracked resource for detailed quota checking"""
def __init__(self, name, model_class, flag=None,
plural_name=None):
super().__init__(
@@ -71,6 +72,7 @@ class TestTrackedResource(resource.TrackedResource):
class TestCountableResource(resource.CountableResource):
"""Describes a test countable resource for detailed quota checking"""
def __init__(self, name, count, flag=-1, plural_name=None):
super().__init__(
name, count, flag=flag, plural_name=None)
+1
View File
@@ -4303,6 +4303,7 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
def test_router_delete_precommit_event(self):
deleted = []
def auditor(r, e, t, payload):
return deleted.append(payload.resource_id)
registry.subscribe(auditor, resources.ROUTER, events.PRECOMMIT_DELETE)
@@ -177,6 +177,7 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
class TestServiceTypeExtensionManager:
"""Mock extensions manager."""
def get_resources(self):
return (servicetype.Servicetype.get_resources() +
dp.Dummy.get_resources())
@@ -232,6 +233,7 @@ class ServiceTypeExtensionTestCase(ServiceTypeExtensionTestCaseBase):
class ServiceTypeManagerExtTestCase(ServiceTypeExtensionTestCaseBase):
"""Tests ServiceTypemanager as a public API."""
def setUp(self):
self.service_providers = mock.patch.object(
provconf.NeutronModule, 'service_providers').start()
@@ -30,6 +30,7 @@ class DataPlaneStatusDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
def setUp(self):
super().setUp()
net = self._create_test_network()
def getter():
return self._create_test_port(network_id=net.id).id
self.update_obj_fields({'port_id': getter})
+1
View File
@@ -118,6 +118,7 @@ class PortBindingVifDetailsTestCase(testscenarios.WithScenarios,
def setUp(self):
super().setUp()
self._create_test_network()
def getter():
return self._create_port(network_id=self._network['id']).id
self.update_obj_fields({'port_id': getter})
@@ -25,6 +25,7 @@ class LoggerMechanismDriver(api.MechanismDriver):
Generally used for testing and debugging.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._supported_extensions = set()
@@ -197,14 +197,14 @@ class TestNBImplIdlOvn(TestDBImplIdlOvn):
'external_ids': {
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY:
utils.ovn_name('lr-id-b'),
}, 'networks': ['20.0.2.0/24'],
'options': {ovn_const.OVN_GATEWAY_CHASSIS_KEY: 'host-2'}},
}, 'networks': ['20.0.2.0/24'],
'options': {ovn_const.OVN_GATEWAY_CHASSIS_KEY: 'host-2'}},
{'name': utils.ovn_lrouter_port_name('orp-id-b3'),
'external_ids': {
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY:
utils.ovn_name('lr-id-b'),
}, 'networks': ['20.0.3.0/24'],
'options': {}},
}, 'networks': ['20.0.3.0/24'],
'options': {}},
{'name': utils.ovn_lrouter_port_name('gwc'),
'external_ids': {
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY:
@@ -215,9 +215,9 @@ class TestNBImplIdlOvn(TestDBImplIdlOvn):
{'name': utils.ovn_lrouter_port_name('not-managed'),
'external_ids': {
'owner': 'not-owned-by-neutron',
},
'networks': ['10.0.5.0/24'],
'options': {}}],
},
'networks': ['10.0.5.0/24'],
'options': {}}],
'gateway_chassis': [
{'chassis_name': 'fake-chassis',
'name': utils.ovn_lrouter_port_name('gwc') + '_fake-chassis'}],
@@ -33,6 +33,7 @@ from neutron.tests.unit.services.logapi.drivers.ovn import test_driver
OvnPortInfo = collections.namedtuple('OvnPortInfo', ['name'])
class FakeACL:
def __init__(self, **kwargs):
@@ -1021,7 +1022,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
'tcp.dst >= 1 && tcp.dst <= 65535'),
'meter': 'acl_log_meter' if test_logging else [],
ovn_const.OVN_SG_RULE_EXT_ID_KEY: 'ruleid2'
},
},
]
del_acls_list = [
('pg_sg_stale', 'to-lport', 1000, 'outport == @pg_sg_stale')]
@@ -2426,10 +2426,10 @@ class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
with mock.patch.object(
mech_driver.OVNMechanismDriver,
'_is_port_provisioning_required', lambda *_: True), \
mock.patch.object(
mock.patch.object(
mech_driver.OVNMechanismDriver,
'_notify_dhcp_updated') as mock_notify_dhcp, \
mock.patch.object(
mock.patch.object(
ovn_client.OVNClient, 'update_port') as mock_update_port:
ovn_conf.cfg.CONF.set_override(
@@ -120,14 +120,14 @@ class TestMigrateNeutronDatabaseToOvn(
# Check there are no vxlan allocations
vxlan_allocations = session.query(
vxlanallocation.VxlanAllocation).filter(
vxlanallocation.VxlanAllocation.allocated == True # noqa
vxlanallocation.VxlanAllocation.allocated == True # noqa
).all()
self.assertFalse(vxlan_allocations)
# Check all the networks have Geneve allocations
geneve_allocations = session.query(
geneveallocation.GeneveAllocation).filter(
geneveallocation.GeneveAllocation.allocated == True # noqa
geneveallocation.GeneveAllocation.allocated == True # noqa
).all()
self.assertEqual(len(networks), len(geneve_allocations))
@@ -32,6 +32,7 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase):
self.plugin = directory.get_plugin()
self.ctx = context.get_admin_context()
self.received = []
def receive(s, ctx, obs, evt):
return self.received.append((obs[0], evt))
mock.patch('neutron.api.rpc.handlers.resources_rpc.'
@@ -1245,9 +1245,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_update_outside_transaction(self):
self.tx_open = True
def receive(r, e, t, payload):
return setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
db_api.is_session_active(payload.context.session))
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_UPDATE)
@@ -1257,9 +1258,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_delete_outside_transaction(self):
self.tx_open = True
def receive(r, e, t, payload):
return setattr(self, 'tx_open',
db_api.is_session_active(payload.context.session))
db_api.is_session_active(payload.context.session))
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_DELETE)
@@ -1837,8 +1839,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
ctx = context.get_admin_context()
b_update_events = []
a_update_events = []
def b_receiver(r, e, t, payload):
return b_update_events.append(payload)
def a_receiver(r, e, t, payload):
return a_update_events.append(payload.latest_state)
@@ -2103,6 +2107,7 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
**host_arg) as port:
port = plugin.get_port(ctx, port['port']['id'])
updated_ports = []
def receiver(r, e, t, payload):
return updated_ports.append(payload.latest_state)
@@ -2117,6 +2122,7 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
def test_bind_port_bumps_revision(self):
updated_ports = []
created_ports = []
def ureceiver(r, e, t, payload):
return updated_ports.append(payload.latest_state)
@@ -2075,6 +2075,7 @@ class TestGetL3AgentsWithAgentModeFilter(TestGetL3AgentsWithFilter):
This class tests the L3AgentSchedulerDbMixin.get_l3_agents()
for the 'agent_mode' filter with various values.
"""
def _get_agent_mode(self, agent):
agent_conf = self.plugin.get_configuration_dict(agent)
return agent_conf.get('agent_mode', 'None')
@@ -2094,6 +2095,7 @@ class TestGetL3AgentsWithHostFilter(TestGetL3AgentsWithFilter):
This class tests the L3AgentSchedulerDbMixin.get_l3_agents()
for the 'host' filter with various values.
"""
def _get_host(self, agent):
return agent.get('host', 'None')
@@ -35,6 +35,7 @@ class FakePlugin:
class TestSGLogRequestValidations(base.BaseTestCase):
"""Test validation for a request"""
def setUp(self):
self.log_plugin = FakePlugin()
importutils.import_module('neutron.services.logapi.common.sg_validate')
@@ -33,6 +33,7 @@ class FakePlugin:
class TestSnatLogRequestValidations(base.BaseTestCase):
"""Test validation for SNAT log request"""
def setUp(self):
self.log_plugin = FakePlugin()
importutils.import_module('neutron.services.logapi.common.'
@@ -41,6 +41,7 @@ class TestOVNDriverBase(base.BaseTestCase):
self.plugin_driver.nb_ovn = fake_resources.FakeOvsdbNbOvnIdl()
self.log_plugin = mock.Mock()
def get_mock_log_plugin(alias):
return self.log_plugin if (
alias == plugin_constants.LOG_API) else None
@@ -70,6 +70,7 @@ class TestLogDriversManagerBase(base.BaseLogTestCase):
class TestLogDriversManagerMulti(TestLogDriversManagerBase):
"""Test calls happen to all drivers"""
def test_driver_manager_empty_with_no_drivers(self):
driver_manager = self._create_manager_with_drivers({})
self.assertEqual(0, len(driver_manager.drivers))
@@ -93,6 +94,7 @@ class TestLogDriversManagerMulti(TestLogDriversManagerBase):
class TestLogDriversManagerLoggingTypes(TestLogDriversManagerBase):
"""Test supported logging types"""
def test_available_logging_types(self):
driver_manager = self._create_manager_with_drivers(
{'driver-A': {'is_loaded': True,
@@ -2244,7 +2244,7 @@ class OVNL3ExtrarouteTests(test_l3_gw.ExtGwModeIntTestCase,
'device_id': ['fake_device']
})
subnet_id = \
gw_ports[0]['fixed_ips'][0]['subnet_id'] if gw_ports else None
gw_ports[0]['fixed_ips'][0]['subnet_id'] if gw_ports else None
self.l3_inst._nb_ovn.add_static_route.assert_called_once_with(
'neutron-fake_device', ip_prefix=constants.IPv4_ANY,

Some files were not shown because too many files have changed in this diff Show More