Merge "Start enforcing E125 flake8 directive"

This commit is contained in:
Zuul 2019-07-22 15:35:27 +00:00 committed by Gerrit Code Review
commit 40ad64181d
100 changed files with 385 additions and 424 deletions

View File

@ -355,7 +355,7 @@ class DvrEdgeRouter(dvr_local_router.DvrLocalRouter):
fixed = fip['fixed_ip_address']
fip_ip = fip['floating_ip_address']
for chain, rule in self._centralized_floating_forward_rules(
fip_ip, fixed):
fip_ip, fixed):
self.snat_iptables_manager.ipv4['nat'].add_rule(
chain, rule, tag='floating_ip')

View File

@ -622,7 +622,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def process_external(self):
if self.agent_conf.agent_mode != (
lib_constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
lib_constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
ex_gw_port = self.get_ex_gw_port()
if ex_gw_port:
self.create_dvr_external_gateway_on_agent(ex_gw_port)
@ -756,8 +756,8 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
operation, route, fip_ns_name, tbl_index)
super(DvrLocalRouter, self).update_routing_table(operation, route)
def _update_fip_route_table_with_next_hop_routes(
self, operation, route, fip_ns_name, tbl_index):
def _update_fip_route_table_with_next_hop_routes(self, operation, route,
fip_ns_name, tbl_index):
cmd = ['ip', 'route', operation, 'to', route['destination'],
'via', route['nexthop'], 'table', tbl_index]
ip_wrapper = ip_lib.IPWrapper(namespace=fip_ns_name)
@ -768,8 +768,8 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
"router %(id)s",
{'ns': fip_ns_name, 'id': self.router_id})
def _check_if_route_applicable_to_fip_namespace(
self, route, agent_gateway_port):
def _check_if_route_applicable_to_fip_namespace(self, route,
agent_gateway_port):
ip_cidrs = common_utils.fixed_ip_cidrs(agent_gateway_port['fixed_ips'])
nexthop_cidr = netaddr.IPAddress(route['nexthop'])
for gw_cidr in ip_cidrs:

View File

@ -56,8 +56,8 @@ class RootwrapDaemonHelper(object):
def get_client(cls):
with cls.__lock:
if cls.__client is None:
if xenapi_root_helper.ROOT_HELPER_DAEMON_TOKEN == \
cfg.CONF.AGENT.root_helper_daemon:
if (xenapi_root_helper.ROOT_HELPER_DAEMON_TOKEN ==
cfg.CONF.AGENT.root_helper_daemon):
cls.__client = xenapi_root_helper.XenAPIClient()
else:
cls.__client = client.Client(

View File

@ -82,7 +82,7 @@ class MeteringAgentNotifyAPI(object):
"""Notify all the agents that are hosting the routers."""
plugin = directory.get_plugin(plugin_constants.L3)
if extensions.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)
else:
cctxt = self.client.prepare(fanout=True)

View File

@ -80,7 +80,7 @@ class DhcpRpcCallback(object):
host = kwargs.get('host')
plugin = directory.get_plugin()
if extensions.is_extension_supported(
plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS):
plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS):
if cfg.CONF.network_auto_schedule:
plugin.auto_schedule_networks(context, host)
nets = plugin.list_active_networks_on_active_dhcp_agent(

View File

@ -74,7 +74,7 @@ class L3RpcCallback(object):
calling update_device_up.
"""
if not extensions.is_extension_supported(
self.plugin, constants.PORT_BINDING_EXT_ALIAS):
self.plugin, constants.PORT_BINDING_EXT_ALIAS):
return
device_filter = {
'device_owner': [constants.DEVICE_OWNER_ROUTER_HA_INTF],
@ -117,7 +117,7 @@ class L3RpcCallback(object):
context = neutron_context.get_admin_context()
routers = self._routers_to_sync(context, router_ids, host)
if extensions.is_extension_supported(
self.plugin, constants.PORT_BINDING_EXT_ALIAS):
self.plugin, constants.PORT_BINDING_EXT_ALIAS):
self._ensure_host_set_on_ports(context, host, routers)
# refresh the data structure after ports are bound
routers = self._routers_to_sync(context, router_ids, host)
@ -128,7 +128,7 @@ class L3RpcCallback(object):
def _routers_to_sync(self, context, router_ids, host=None):
if extensions.is_extension_supported(
self.l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self.l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
routers = (
self.l3plugin.list_active_sync_routers_on_active_l3_agent(
context, host, router_ids))

View File

@ -128,8 +128,8 @@ class External_net_db_mixin(object):
# (and thus, possible floating IPs) on this network before
# allow it to be update to external=False
if context.session.query(models_v2.Port.id).filter_by(
device_owner=constants.DEVICE_OWNER_ROUTER_GW,
network_id=net_data['id']).first():
device_owner=constants.DEVICE_OWNER_ROUTER_GW,
network_id=net_data['id']).first():
raise extnet_exc.ExternalNetworkInUse(net_id=net_id)
net_obj.ExternalNetwork.delete_objects(

View File

@ -2091,8 +2091,7 @@ class L3_NAT_db_mixin(L3_NAT_dbonly_mixin, L3RpcNotifierMixin):
super(L3_NAT_db_mixin, self).notify_routers_updated(
context, list(router_ids), 'disassociate_floatingips', {})
def _migrate_router_ports(
self, context, router_db, old_owner, new_owner):
def _migrate_router_ports(self, context, router_db, old_owner, new_owner):
"""Update the model to support the dvr case of a router."""
for rp in router_db.attached_ports:
if rp.port_type == old_owner:

View File

@ -186,8 +186,8 @@ class DVRResourceOperationHandler(object):
not old_router.get(l3_apidef.EXTERNAL_GW_INFO))
if not do_create:
return
if not self._create_snat_intf_ports_if_not_exists(
context.elevated(), router_db):
if not self._create_snat_intf_ports_if_not_exists(context.elevated(),
router_db):
LOG.debug("SNAT interface ports not created: %s",
router_db['id'])
return router_db
@ -220,8 +220,9 @@ class DVRResourceOperationHandler(object):
msg = _("Unable to create the SNAT Interface Port")
raise n_exc.BadRequest(resource='router', msg=msg)
with plugin_utils.delete_port_on_error(
self.l3plugin._core_plugin, context.elevated(), snat_port['id']):
with plugin_utils.delete_port_on_error(self.l3plugin._core_plugin,
context.elevated(),
snat_port['id']):
l3_obj.RouterPort(
context,
port_id=snat_port['id'],
@ -313,8 +314,8 @@ class DVRResourceOperationHandler(object):
self.l3plugin.l3_rpc_notifier.delete_fipnamespace_for_ext_net(
context, network_id)
def delete_floatingip_agent_gateway_port(
self, context, host_id, ext_net_id):
def delete_floatingip_agent_gateway_port(self, context, host_id,
ext_net_id):
"""Function to delete FIP gateway port with given ext_net_id."""
# delete any fip agent gw port
device_filter = {'device_owner': [const.DEVICE_OWNER_AGENT_GW],
@ -327,8 +328,8 @@ class DVRResourceOperationHandler(object):
if host_id:
return
def _get_ports_for_allowed_address_pair_ip(
self, context, network_id, fixed_ip):
def _get_ports_for_allowed_address_pair_ip(self, context, network_id,
fixed_ip):
"""Return all active ports associated with the allowed_addr_pair ip."""
query = context.session.query(
models_v2.Port).filter(
@ -391,8 +392,7 @@ class DVRResourceOperationHandler(object):
self._inherit_service_port_and_arp_update(
context, addr_pair_active_service_port_list[0])
def _inherit_service_port_and_arp_update(
self, context, service_port):
def _inherit_service_port_and_arp_update(self, context, service_port):
"""Function inherits port host bindings for allowed_address_pair."""
service_port_dict = self.l3plugin._core_plugin._make_port_dict(
service_port)
@ -473,8 +473,8 @@ class DVRResourceOperationHandler(object):
port['id'])
LOG.debug("CSNAT port updated for IPv6 subnet: %s", updated_port)
def _find_v6_router_port_by_network_and_device_owner(
self, router, net_id, device_owner):
def _find_v6_router_port_by_network_and_device_owner(self, router, net_id,
device_owner):
for port in router.attached_ports:
p = port['port']
if (p['network_id'] == net_id and
@ -482,8 +482,8 @@ class DVRResourceOperationHandler(object):
self.l3plugin._port_has_ipv6_address(p)):
return self.l3plugin._core_plugin._make_port_dict(p)
def _check_for_multiprefix_csnat_port_and_update(
self, context, router, network_id, subnet_id):
def _check_for_multiprefix_csnat_port_and_update(self, context, router,
network_id, subnet_id):
"""Checks if the csnat port contains multiple ipv6 prefixes.
If the csnat port contains multiple ipv6 prefixes for the given
@ -764,7 +764,7 @@ class _DVRAgentInterfaceMixin(object):
if router:
if router['distributed']:
if self._skip_floating_ip_for_mismatched_agent_or_host(
floating_ip, agent, host):
floating_ip, agent, host):
continue
router_floatingips = router.get(const.FLOATINGIP_KEY, [])
router_floatingips.append(floating_ip)
@ -889,8 +889,8 @@ class _DVRAgentInterfaceMixin(object):
self._get_dvr_migrating_service_port_hostid(
context, fip['port_id'], port=vm_port))
vm_port_agent_mode = vm_port.get('agent', None)
if vm_port_agent_mode != (
const.L3_AGENT_MODE_DVR_NO_EXTERNAL):
if (vm_port_agent_mode !=
const.L3_AGENT_MODE_DVR_NO_EXTERNAL):
# For floatingip configured on ports that do not
# reside on a 'dvr_no_external' agent, add the
# fip host binding, else it will be created
@ -926,8 +926,8 @@ class _DVRAgentInterfaceMixin(object):
port_db = port or self._core_plugin.get_port(context, port_id)
return port_db[portbindings.HOST_ID] or None
def _get_dvr_migrating_service_port_hostid(
self, context, port_id, port=None):
def _get_dvr_migrating_service_port_hostid(self, context, port_id,
port=None):
"""Returns the migrating host_id from the migrating profile."""
port_db = port or self._core_plugin.get_port(context, port_id)
port_profile = port_db.get(portbindings.PROFILE)
@ -963,8 +963,8 @@ class _DVRAgentInterfaceMixin(object):
self.create_fip_agent_gw_port_if_not_exists(
context.elevated(), network_id, host)
def create_fip_agent_gw_port_if_not_exists(
self, context, network_id, host):
def create_fip_agent_gw_port_if_not_exists(self, context, network_id,
host):
"""Function to return the FIP Agent GW port.
This function will create a FIP Agent GW port
@ -1010,8 +1010,8 @@ class _DVRAgentInterfaceMixin(object):
self._populate_mtu_and_subnets_for_ports(context, [agent_port])
return agent_port
def _generate_arp_table_and_notify_agent(
self, context, fixed_ip, mac_address, notifier):
def _generate_arp_table_and_notify_agent(self, context, fixed_ip,
mac_address, notifier):
"""Generates the arp table entry and notifies the l3 agent."""
ip_address = fixed_ip['ip_address']
subnet = fixed_ip['subnet_id']
@ -1025,8 +1025,7 @@ class _DVRAgentInterfaceMixin(object):
for router_id in routers:
notifier(context, router_id, arp_table)
def _get_subnet_id_for_given_fixed_ip(
self, context, fixed_ip, port_dict):
def _get_subnet_id_for_given_fixed_ip(self, context, fixed_ip, port_dict):
"""Returns the subnet_id that matches the fixedip on a network."""
filters = {'network_id': [port_dict['network_id']]}
subnets = self._core_plugin.get_subnets(context, filters)
@ -1072,8 +1071,8 @@ class _DVRAgentInterfaceMixin(object):
context, fixed_ip, port_dict['mac_address'],
self.l3_rpc_notifier.add_arp_entry)
def delete_arp_entry_for_dvr_service_port(
self, context, port_dict, fixed_ips_to_delete=None):
def delete_arp_entry_for_dvr_service_port(self, context, port_dict,
fixed_ips_to_delete=None):
"""Notify L3 agents of ARP table entry for dvr service port.
When a dvr service port goes down, look for the DVR

View File

@ -38,7 +38,7 @@ class MeteringRpcCallbacks(object):
metering_data = self.meter_plugin.get_sync_data_metering(context)
host = kwargs.get('host')
if not extensions.is_extension_supported(
l3_plugin, consts.L3_AGENT_SCHEDULER_EXT_ALIAS) or not host:
l3_plugin, consts.L3_AGENT_SCHEDULER_EXT_ALIAS) or not host:
return metering_data
else:
agents = l3_plugin.get_l3_agents(context, filters={'host': [host]})

View File

@ -30,8 +30,8 @@ class PortSecurityDbCommon(object):
response_data[psec.PORTSECURITY] = (
db_data['port_security'][psec.PORTSECURITY])
def _process_port_security_create(
self, context, obj_cls, res_name, req, res):
def _process_port_security_create(self, context, obj_cls, res_name, req,
res):
obj = obj_cls(
context,
id=res['id'],
@ -41,14 +41,13 @@ class PortSecurityDbCommon(object):
res[psec.PORTSECURITY] = req[psec.PORTSECURITY]
return self._make_port_security_dict(obj, res_name)
def _process_port_port_security_create(
self, context, port_req, port_res):
def _process_port_port_security_create(self, context, port_req, port_res):
self._process_port_security_create(
context, p_ps.PortSecurity, 'port',
port_req, port_res)
def _process_network_port_security_create(
self, context, network_req, network_res):
def _process_network_port_security_create(self, context, network_req,
network_res):
self._process_port_security_create(
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
@ -66,19 +65,18 @@ class PortSecurityDbCommon(object):
def _get_port_security_binding(self, context, port_id):
return self._get_security_binding(context, p_ps.PortSecurity, port_id)
def _process_port_port_security_update(
self, context, port_req, port_res):
def _process_port_port_security_update(self, context, port_req, port_res):
self._process_port_security_update(
context, p_ps.PortSecurity, 'port', port_req, port_res)
def _process_network_port_security_update(
self, context, network_req, network_res):
def _process_network_port_security_update(self, context, network_req,
network_res):
self._process_port_security_update(
context, network.NetworkPortSecurity, 'network',
network_req, network_res)
def _process_port_security_update(
self, context, obj_cls, res_name, req, res):
def _process_port_security_update(self, context, obj_cls, res_name, req,
res):
if psec.PORTSECURITY not in req:
return
port_security_enabled = req[psec.PORTSECURITY]

View File

@ -97,7 +97,7 @@ def remove_provisioning_component(context, object_id, object_type, entity,
if not standard_attr_id:
return False
if pb_obj.ProvisioningBlock.delete_objects(
context, standard_attr_id=standard_attr_id, entity=entity):
context, standard_attr_id=standard_attr_id, entity=entity):
return True
else:
return False

View File

@ -26,7 +26,7 @@ def utcnow():
class QuotaUsageInfo(collections.namedtuple(
'QuotaUsageInfo', ['resource', 'tenant_id', 'used', 'dirty'])):
'QuotaUsageInfo', ['resource', 'tenant_id', 'used', 'dirty'])):
"""Information about resource quota usage."""

View File

@ -118,8 +118,7 @@ class DbQuotaDriver(object):
Raise a "not found" error if the quota for the given tenant was
never defined.
"""
if quota_obj.Quota.delete_objects(
context, project_id=tenant_id) < 1:
if quota_obj.Quota.delete_objects(context, project_id=tenant_id) < 1:
# No record deleted means the quota was not found
raise exceptions.TenantQuotaNotFound(tenant_id=tenant_id)

View File

@ -254,8 +254,9 @@ class L2populationMechanismDriver(api.MechanismDriver):
agent_host = context.host
l3plugin = directory.get_plugin(plugin_constants.L3)
# when agent transitions to backup, don't remove flood flows
if agent_host and l3plugin and getattr(
l3plugin, "list_router_ids_on_host", None):
if agent_host and l3plugin and getattr(l3plugin,
"list_router_ids_on_host",
None):
admin_context = n_context.get_admin_context()
port_context = context._plugin_context
fdb_entries = self._get_agent_fdb(

View File

@ -557,7 +557,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
if not device_owner.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX):
# Check if device needs to be added to bridge
if not bridge_lib.BridgeDevice.get_interface_bridge(
tap_device_name):
tap_device_name):
data = {'tap_device_name': tap_device_name,
'bridge_name': bridge_name}
LOG.debug("Adding device %(tap_device_name)s to bridge "
@ -874,10 +874,9 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
return lconst.EXTENSION_DRIVER_TYPE
class LinuxBridgeRpcCallbacks(
sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2pop_rpc.L2populationRpcCallBackMixin,
amb.CommonAgentManagerRpcCallBackBase):
class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2pop_rpc.L2populationRpcCallBackMixin,
amb.CommonAgentManagerRpcCallBackBase):
# Set RPC API version to 1.0 by default.
# history

View File

@ -1133,7 +1133,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
bridge.set_allowed_macs_for_port(port=vif.ofport, allow_all=True)
return
if port_details['device_owner'].startswith(
n_const.DEVICE_OWNER_NETWORK_PREFIX):
n_const.DEVICE_OWNER_NETWORK_PREFIX):
LOG.debug("Skipping ARP spoofing rules for network owned port "
"'%s'.", vif.port_name)
bridge.delete_arp_spoofing_protection(port=vif.ofport)
@ -2300,8 +2300,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# REVISIT (rossella_s) Define a method "reset" in
# BasePollingManager that will be implemented by AlwaysPoll as
# no action and by InterfacePollingMinimizer as start/stop
if isinstance(
polling_manager, polling.InterfacePollingMinimizer):
if isinstance(polling_manager, polling.InterfacePollingMinimizer):
polling_manager.stop()
polling_manager.start()

View File

@ -147,9 +147,9 @@ class VlanTypeDriver(helpers.SegmentTypeDriver):
# allocated, update_objects will return 0 so we
# don't delete.
if vlanalloc.VlanAllocation.update_objects(
ctx, values={'allocated': False},
allocated=False, vlan_id=alloc.vlan_id,
physical_network=physical_network):
ctx, values={'allocated': False},
allocated=False, vlan_id=alloc.vlan_id,
physical_network=physical_network):
alloc.delete()
del allocations[physical_network]

View File

@ -491,7 +491,7 @@ def _update_port_in_external_dns_service(resource, event, trigger, **kwargs):
dns_data_db = port_obj.PortDNS.get_object(
context, port_id=updated_port['id'])
if not (dns_data_db and (dns_data_db['previous_dns_name'] or dns_data_db[
'current_dns_name'])):
'current_dns_name'])):
return
if dns_data_db['previous_dns_name']:
_remove_data_from_external_dns_service(

View File

@ -1933,8 +1933,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self._post_delete_port(
context, port, router_ids, bound_mech_contexts)
def _post_delete_port(
self, context, port, router_ids, bound_mech_contexts):
def _post_delete_port(self, context, port, router_ids,
bound_mech_contexts):
kwargs = {
'context': context,
'port': port,

View File

@ -68,8 +68,7 @@ class AutoScheduler(object):
segments_on_host = {s.segment_id for s in segment_host_mapping}
for dhcp_agent in dhcp_agents:
if agent_utils.is_agent_down(
dhcp_agent.heartbeat_timestamp):
if agent_utils.is_agent_down(dhcp_agent.heartbeat_timestamp):
LOG.warning('DHCP agent %s is not active', dhcp_agent.id)
continue
for net_id, is_routed_network in net_ids.items():

View File

@ -307,8 +307,9 @@ class AutoAllocatedTopologyMixin(object):
network_id = network['id'] if network else None
raise exceptions.UnknownProvisioningError(e, network_id=network_id)
def _provision_external_connectivity(
self, context, default_external_network, subnets, tenant_id):
def _provision_external_connectivity(self, context,
default_external_network, subnets,
tenant_id):
"""Uplink tenant subnet(s) to external network."""
router_args = {
'name': 'auto_allocated_router',

View File

@ -243,7 +243,7 @@ class DriverController(object):
class _LegacyPlusProviderConfiguration(
provider_configuration.ProviderConfiguration):
provider_configuration.ProviderConfiguration):
def __init__(self):
# loads up ha, dvr, and single_node service providers automatically.

View File

@ -346,8 +346,8 @@ class IptablesMeteringDriver(abstract_driver.MeteringAbstractDriver):
rm.iptables_manager.ipv4['filter'].add_rule(
label_chain, '', wrap=False)
def _process_metering_rule_action_based_on_ns(
self, router, action, ext_dev, im):
def _process_metering_rule_action_based_on_ns(self, router, action,
ext_dev, im):
'''Process metering rule actions based specific namespaces.'''
rm = self.routers.get(router['id'])
with IptablesManagerTransaction(im):

View File

@ -23,8 +23,7 @@ class SegmentNotFound(exceptions.NotFound):
message = _("Segment %(segment_id)s could not be found.")
class NoUpdateSubnetWhenMultipleSegmentsOnNetwork(
exceptions.BadRequest):
class NoUpdateSubnetWhenMultipleSegmentsOnNetwork(exceptions.BadRequest):
message = _("The network '%(network_id)s' has multiple segments, it is "
"only possible to associate an existing subnet with a segment "
"on networks with a single segment.")

View File

@ -116,5 +116,5 @@ class TagPlugin(tagging.TagPluginBase):
def delete_tag(self, context, resource, resource_id, tag):
res = self._get_resource(context, resource, resource_id)
if not tag_obj.Tag.delete_objects(
context, tag=tag, standard_attr_id=res.standard_attr_id):
context, tag=tag, standard_attr_id=res.standard_attr_id):
raise tagging.TagNotFound(tag=tag)

View File

@ -75,8 +75,7 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
{'event': event_type, 'subports': subports, 'err': e})
@local_registry.receives(resources.TRUNK, [local_events.BEFORE_CREATE])
def check_trunk_dependencies(
self, resource, event, trigger, **kwargs):
def check_trunk_dependencies(self, resource, event, trigger, **kwargs):
# The OVS trunk driver does not work with iptables firewall and QoS.
# We should validate the environment configuration and signal that
# something might be wrong.

View File

@ -256,8 +256,8 @@ class SubPortsValidator(object):
if subport['port_id'] == self.trunk_port_id:
raise trunk_exc.ParentPortInUse(port_id=subport['port_id'])
def _raise_subport_invalid_mtu(
self, context, subport, trunk_port_mtu, subport_mtus):
def _raise_subport_invalid_mtu(self, context, subport, trunk_port_mtu,
subport_mtus):
# Check MTU sanity - subport MTU must not exceed trunk MTU.
# If for whatever reason trunk_port_mtu is not available,
# the MTU sanity check cannot be enforced.

View File

@ -88,8 +88,8 @@ def wait_until_dscp_marking_rule_applied_ovs(bridge, port_vif, rule):
common_utils.wait_until_true(_dscp_marking_rule_applied)
def wait_until_dscp_marking_rule_applied_linuxbridge(
namespace, port_vif, expected_rule):
def wait_until_dscp_marking_rule_applied_linuxbridge(namespace, port_vif,
expected_rule):
iptables = iptables_manager.IptablesManager(
namespace=namespace)

View File

@ -680,8 +680,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
self._assert_ip_address_on_interface(namespace, interface,
ip_address)
def _assert_ip_address_not_on_interface(
self, namespace, interface, ip_address):
def _assert_ip_address_not_on_interface(self,
namespace, interface, ip_address):
self.assertNotIn(
ip_address, self._get_addresses_on_device(namespace, interface))
@ -690,8 +690,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
self.assertIn(
ip_address, self._get_addresses_on_device(namespace, interface))
def _assert_ping_reply_from_expected_address(
self, ping_result, expected_address):
def _assert_ping_reply_from_expected_address(self, ping_result,
expected_address):
ping_results = ping_result.split('\n')
self.assertGreater(
len(ping_results), 1,

View File

@ -128,7 +128,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertEqual(2, interface_rules_list_count)
def test_dvr_update_gateway_port_no_fip_fg_port_recovers_itself_with_fpr(
self):
self):
self.agent.conf.agent_mode = 'dvr'
# Create the router with external net
router_info = self.generate_dvr_router_info()
@ -232,8 +232,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self._assert_fip_namespace_deleted(external_gw_port)
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
def test_dvr_process_fips_with_no_gw_port_in_namespace(
self, fip_subscribe):
def test_dvr_process_fips_with_no_gw_port_in_namespace(self,
fip_subscribe):
self.agent.conf.agent_mode = 'dvr'
# Create the router with external net
@ -584,8 +584,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.agent._register_router_cls(self.agent.router_factory)
return router
def _get_fip_agent_gw_port_for_router(
self, external_gw_port):
def _get_fip_agent_gw_port_for_router(self, external_gw_port):
# Add fip agent gateway port information to the router_info
if external_gw_port:
# Get values from external gateway port
@ -915,16 +914,16 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertFalse(self._fixed_ip_rule_exists(router_ns, fixed_ip))
self.assertTrue(self._fixed_ip_rule_exists(router_ns, new_fixed_ip))
def _assert_iptables_rules_exist(
self, router_iptables_manager, table_name, expected_rules):
def _assert_iptables_rules_exist(self, router_iptables_manager,
table_name, expected_rules):
rules = router_iptables_manager.get_rules_for_table(table_name)
for rule in expected_rules:
self.assertIn(
str(iptables_manager.IptablesRule(rule[0], rule[1])), rules)
return True
def _assert_iptables_rules_not_exist(
self, router_iptables_manager, table_name, expected_rules):
def _assert_iptables_rules_not_exist(self, router_iptables_manager,
table_name, expected_rules):
rules = router_iptables_manager.get_rules_for_table(table_name)
for rule in expected_rules:
self.assertNotIn(
@ -1092,8 +1091,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router.router_id)
self.assertTrue(self._namespace_exists(namespace))
def _get_dvr_snat_namespace_device_status(
self, router, internal_dev_name=None):
def _get_dvr_snat_namespace_device_status(self, router,
internal_dev_name=None):
"""Function returns the internal and external device status."""
snat_ns = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
router.router_id)
@ -1451,7 +1450,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
@mock.patch.object(
dvr_ha_router.DvrEdgeHaRouter, 'get_centralized_fip_cidr_set')
def test_dvr_ha_router_with_centralized_fip_calls_keepalived_cidr(
self, connect_rtr_2_fip_mock, fip_cidr_centralized_mock):
self, connect_rtr_2_fip_mock, fip_cidr_centralized_mock):
self._setup_dvr_ha_agents()
self._setup_dvr_ha_bridges()
@ -1470,7 +1469,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
@mock.patch.object(
dvr_edge_router.DvrEdgeRouter, 'get_centralized_fip_cidr_set')
def test_dvr_router_with_centralized_fip_calls_keepalived_cidr(
self, connect_rtr_2_fip_mock, fip_cidr_centralized_mock):
self, connect_rtr_2_fip_mock, fip_cidr_centralized_mock):
router_info = self.generate_dvr_router_info(
enable_gw=True, enable_centralized_fip=True, snat_bound_fip=True)
@ -1626,9 +1625,9 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.failover_agent._process_updated_router(router2.router)
self.assertFalse(r2_chsfr.called)
def _setup_dvr_router_static_routes(
self, router_namespace=True,
check_fpr_int_rule_delete=False, enable_ha=False):
def _setup_dvr_router_static_routes(self, router_namespace=True,
check_fpr_int_rule_delete=False,
enable_ha=False):
"""Test to validate the extra routes on dvr routers."""
self.agent.conf.agent_mode = 'dvr_snat'
router_info = self.generate_dvr_router_info(
@ -1739,20 +1738,20 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self._setup_dvr_router_static_routes(router_namespace=False)
def test_dvr_router_static_routes_in_snat_namespace_and_router_namespace(
self):
self):
self._setup_dvr_router_static_routes()
def test_dvr_ha_rtr_static_routes_in_rtr_namespace(self):
self._setup_dvr_router_static_routes(enable_ha=True)
def test_dvr_router_rule_and_route_table_cleared_when_fip_removed(
self):
def test_dvr_router_rule_and_route_table_cleared_when_fip_removed(self):
self._setup_dvr_router_static_routes(
router_namespace=False, check_fpr_int_rule_delete=True)
def _assert_fip_namespace_interface_static_routes(
self, address_scopes, fpr_device,
router_info, rtr_2_fip, fpr_device_name):
def _assert_fip_namespace_interface_static_routes(self, address_scopes,
fpr_device, router_info,
rtr_2_fip,
fpr_device_name):
fixed_ips_1 = router_info[lib_constants.INTERFACE_KEY][0]['fixed_ips']
fixed_ips_2 = router_info[lib_constants.INTERFACE_KEY][1]['fixed_ips']
actual_routes = fpr_device.route.list_routes(
@ -1790,9 +1789,11 @@ class TestDvrRouter(framework.L3AgentTestFramework):
else:
self.assertEqual([], actual_routes)
def _assert_interface_rules_on_gateway_remove(
self, router, agent, address_scopes, agent_gw_port,
rfp_device, fpr_device, no_external=False):
def _assert_interface_rules_on_gateway_remove(self, router, agent,
address_scopes,
agent_gw_port, rfp_device,
fpr_device,
no_external=False):
router.router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
router.router['gw_port'] = ""
@ -1889,11 +1890,11 @@ class TestDvrRouter(framework.L3AgentTestFramework):
rfp_device, fpr_device)
def test_dvr_fip_and_router_namespace_rules_with_address_scopes_match(
self):
self):
self._setup_dvr_router_for_fast_path_exit(address_scopes=True)
def test_dvr_fip_and_router_namespace_rules_with_address_scopes_mismatch(
self):
self):
self._setup_dvr_router_for_fast_path_exit(address_scopes=False)
@mock.patch.object(dvr_local_router.DvrLocalRouter,
@ -1901,8 +1902,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
@mock.patch.object(dvr_local_router.DvrLocalRouter,
'_add_interface_route_to_fip_ns')
def test_dvr_no_external_router_namespace_rules_with_address_scopes_match(
self, mock_add_interface_route_rule,
mock_add_fip_interface_route_rule):
self, mock_add_interface_route_rule,
mock_add_fip_interface_route_rule):
"""Test to validate the router namespace routes.
This test validates the router namespace routes
@ -1978,8 +1979,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertIsNone(ex_gw_device.route.get_gateway())
self.assertIsNone(fg_device.route.get_gateway())
def _assert_fip_namespace_deleted(
self, ext_gateway_port, assert_ovs_interface=True):
def _assert_fip_namespace_deleted(self, ext_gateway_port,
assert_ovs_interface=True):
ext_net_id = ext_gateway_port['network_id']
fip_ns = self.agent.get_fip_ns(ext_net_id)
fip_ns.unsubscribe = mock.Mock()

View File

@ -570,7 +570,7 @@ class TestAZAwareWeightScheduler(test_dhcp_sch.TestDhcpSchedulerBaseTestCase,
class TestDHCPSchedulerWithNetworkAccessibility(
test_plugin.Ml2PluginV2TestCase):
test_plugin.Ml2PluginV2TestCase):
_mechanism_drivers = ['openvswitch']

View File

@ -328,7 +328,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
test_agent_mode=None)
def _test_create_floating_ip_agent_notification(
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
with self.subnet() as ext_subnet,\
self.subnet(cidr='20.0.0.0/24') as int_subnet,\
self.port(subnet=int_subnet,
@ -369,8 +369,8 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self.l3_plugin.create_floatingip(
self.context, {'floatingip': floating_ip})
if dvr:
if test_agent_mode == (
constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
if (test_agent_mode ==
constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
if router['ha']:
expected_calls = [
mock.call(self.context,
@ -400,7 +400,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self._test_create_floating_ip_agent_notification()
def test_create_floating_ip_agent_notification_for_dvr_no_external_agent(
self):
self):
agent_mode = constants.L3_AGENT_MODE_DVR_NO_EXTERNAL
self._test_create_floating_ip_agent_notification(
test_agent_mode=agent_mode)
@ -409,7 +409,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self._test_create_floating_ip_agent_notification(dvr=False)
def _test_update_floating_ip_agent_notification(
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
with self.subnet() as ext_subnet,\
self.subnet(cidr='20.0.0.0/24') as int_subnet1,\
self.subnet(cidr='30.0.0.0/24') as int_subnet2,\
@ -465,8 +465,8 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self.context, floating_ip['id'],
{'floatingip': updated_floating_ip})
if dvr:
if test_agent_mode == (
constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
if (test_agent_mode ==
constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
if router1['ha'] and router2['ha']:
self.assertEqual(
4,
@ -518,7 +518,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self._test_update_floating_ip_agent_notification()
def test_update_floating_ip_agent_notification_with_dvr_no_external_agents(
self):
self):
agent_mode = constants.L3_AGENT_MODE_DVR_NO_EXTERNAL
self._test_update_floating_ip_agent_notification(
test_agent_mode=agent_mode)
@ -531,7 +531,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
test_agent_mode=None)
def _test_delete_floating_ip_agent_notification(
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
self, dvr=True, test_agent_mode=constants.L3_AGENT_MODE_DVR):
with self.subnet() as ext_subnet,\
self.subnet(cidr='20.0.0.0/24') as int_subnet,\
self.port(subnet=int_subnet,
@ -611,7 +611,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self._test_delete_floating_ip_agent_notification()
def test_delete_floating_ip_agent_notification_with_dvr_no_external_agents(
self):
self):
agent_mode = constants.L3_AGENT_MODE_DVR_NO_EXTERNAL
self._test_delete_floating_ip_agent_notification(
test_agent_mode=agent_mode)

View File

@ -282,7 +282,7 @@ class TestAsyncProcessLogging(base.BaseTestCase):
def _test__read_stdout_logging(self, enable):
proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
mock.patch.object(proc, '_process'):
proc._read_stdout()
self.assertEqual(enable, self.log_mock.debug.called)

View File

@ -262,7 +262,7 @@ class TestDhcpAgent(base.BaseTestCase):
# message.
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
with testtools.ExpectedException(
exceptions.InvalidConfigurationOption):
exceptions.InvalidConfigurationOption):
dhcp_agent.DhcpAgent(HOSTNAME)
log.assert_any_call("DHCP agent must have resync_throttle <= "
"resync_interval")
@ -534,7 +534,7 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
with mock.patch.object(dhcp.state_rpc,
'report_state') as report_state,\
mock.patch.object(dhcp, "run"):
mock.patch.object(dhcp, "run"):
report_state.return_value = agent_consts.AGENT_ALIVE
dhcp._report_state()
self.assertEqual({}, dhcp.needs_resync_reasons)
@ -793,8 +793,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.plugin.get_network_info.return_value = fake_network_ipv6_ipv4
self.call_driver.return_value = False
cfg.CONF.set_override('enable_isolated_metadata', True)
with mock.patch.object(
self.dhcp, 'enable_isolated_metadata_proxy') as enable_metadata:
with mock.patch.object(self.dhcp,
'enable_isolated_metadata_proxy') as enable_metadata:
self.dhcp.enable_dhcp_helper(fake_network_ipv6_ipv4.id)
self.plugin.assert_has_calls(
[mock.call.get_network_info(fake_network_ipv6_ipv4.id)])

View File

@ -370,8 +370,8 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
self.assertIsNone(self.qos_ext.policy_map.get_port_policy(port))
def test__handle_notification_ignores_all_event_types_except_updated(self):
with mock.patch.object(
self.qos_ext, '_process_update_policy') as update_mock:
with mock.patch.object(self.qos_ext,
'_process_update_policy') as update_mock:
for event_type in set(events.VALID) - {events.UPDATED}:
self.qos_ext._handle_notification(mock.Mock(), 'QOS',
@ -379,8 +379,8 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
self.assertFalse(update_mock.called)
def test__handle_notification_passes_update_events(self):
with mock.patch.object(
self.qos_ext, '_process_update_policy') as update_mock:
with mock.patch.object(self.qos_ext,
'_process_update_policy') as update_mock:
policy_obj = mock.Mock()
self.qos_ext._handle_notification(mock.Mock(), 'QOS',

View File

@ -41,7 +41,7 @@ HOSTNAME = 'testhost'
class PortForwardingExtensionBaseTestCase(
test_agent.BasicRouterOperationsFramework):
test_agent.BasicRouterOperationsFramework):
def setUp(self):
super(PortForwardingExtensionBaseTestCase, self).setUp()

View File

@ -958,8 +958,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.assertIsNotNone(ri.snat_namespace)
self.assertEqual(ri.snat_namespace.name, ri.get_gw_ns_name())
def test_ext_gw_updated_calling_snat_ns_delete_if_gw_port_host_none(
self):
def test_ext_gw_updated_calling_snat_ns_delete_if_gw_port_host_none(self):
"""Test to check the impact of snat_namespace object.
This function specifically checks the impact of the snat
@ -982,7 +981,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
@mock.patch.object(namespaces.Namespace, 'delete')
def test_snat_ns_delete_not_called_when_snat_namespace_does_not_exist(
self, mock_ns_del):
self, mock_ns_del):
"""Test to check the impact of snat_namespace object.
This function specifically checks the impact of the snat
@ -1409,8 +1408,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.assertEqual(1, ri.fip_ns.create_rtr_2_fip_link.call_count)
@mock.patch.object(lla.LinkLocalAllocator, '_write')
def test_floating_ip_not_configured_if_no_host_or_dest_host(
self, lla_write):
def test_floating_ip_not_configured_if_no_host_or_dest_host(self,
lla_write):
fake_network_id = _uuid()
subnet_id = _uuid()
fake_floatingips = {'floatingips': [
@ -2269,9 +2268,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_process_router_floatingip_disabled(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
with mock.patch.object(
agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
with mock.patch.object(agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
fip_id = _uuid()
router = l3_test_common.prepare_router_data(num_internal_ports=1)
router[lib_constants.FLOATINGIP_KEY] = [
@ -2302,9 +2300,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_process_router_floatingip_exception(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
with mock.patch.object(
agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
with mock.patch.object(agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
fip_id = _uuid()
router = l3_test_common.prepare_router_data(num_internal_ports=1)
router[lib_constants.FLOATINGIP_KEY] = [
@ -2327,9 +2324,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_process_external_iptables_exception(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
with mock.patch.object(
agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
with mock.patch.object(agent.plugin_rpc,
'update_floatingip_statuses') as mock_update_fip_status:
fip_id = _uuid()
router = l3_test_common.prepare_router_data(num_internal_ports=1)
router[lib_constants.FLOATINGIP_KEY] = [
@ -2628,9 +2624,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
'distributed': False}
driver = metadata_driver.MetadataDriver
with mock.patch.object(
driver, 'destroy_monitored_metadata_proxy') as destroy_proxy:
driver, 'destroy_monitored_metadata_proxy') as destroy_proxy:
with mock.patch.object(
driver, 'spawn_monitored_metadata_proxy') as spawn_proxy:
driver, 'spawn_monitored_metadata_proxy') as spawn_proxy:
agent._process_added_router(router)
if enableflag:
spawn_proxy.assert_called_with(

View File

@ -119,8 +119,8 @@ class TestDvrFipNs(base.BaseTestCase):
@mock.patch.object(ip_lib, 'send_ip_addr_adv_notif')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
@mock.patch.object(dvr_fip_ns.FipNamespace, '_add_default_gateway_for_fip')
def test_update_gateway_port(
self, def_gw, fip_sub, send_adv_notif, IPDevice):
def test_update_gateway_port(self, def_gw, fip_sub, send_adv_notif,
IPDevice):
fip_sub.return_value = False
self.fip_ns._check_for_gateway_ip_change = mock.Mock(return_value=True)
agent_gw_port = self._get_agent_gw_port()
@ -142,8 +142,8 @@ class TestDvrFipNs(base.BaseTestCase):
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'delete')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'unsubscribe')
def test_update_gateway_port_raises_exception(
self, fip_unsub, fip_delete, fip_sub, exists):
def test_update_gateway_port_raises_exception(self, fip_unsub, fip_delete,
fip_sub, exists):
agent_gw_port = self._get_agent_gw_port()
self.fip_ns._create_gateway_port = mock.Mock()
self.fip_ns.create_or_update_gateway_port(agent_gw_port)

View File

@ -187,7 +187,7 @@ class TestDvrRouterOperations(base.BaseTestCase):
self._setup_create_dvr_fip_interfaces_for_setting_routing_rules()
def _setup_create_dvr_fip_interfaces_for_setting_routing_rules(
self, address_scopes_match=False):
self, address_scopes_match=False):
ri = self._create_router()
ri.get_floating_agent_gw_interface = mock.Mock()
ri.fip_ns = mock.Mock()

View File

@ -196,7 +196,8 @@ class TestRouterInfo(base.BaseTestCase):
ri = router_info.RouterInfo(mock.Mock(), _uuid(), {}, **self.ri_kwargs)
ri.router = {'id': _uuid()}
with mock.patch.object(ri, '_process_internal_ports') as p_i_p,\
mock.patch.object(ri, '_process_external_on_delete') as p_e_o_d:
mock.patch.object(ri,
'_process_external_on_delete') as p_e_o_d:
self.mock_ip.netns.exists.return_value = False
ri.process_delete()
self.assertFalse(p_i_p.called)

View File

@ -2739,7 +2739,8 @@ class TestDnsmasq(TestBase):
self.conf, FakeDualNetworkDualDHCP()))
def _test__generate_opts_per_subnet_helper(
self, config_opts, expected_mdt_ip, network_class=FakeNetworkDhcpPort):
self, config_opts, expected_mdt_ip,
network_class=FakeNetworkDhcpPort):
for key, value in config_opts.items():
self.conf.set_override(key, value)
dm = self._get_dnsmasq(network_class())

View File

@ -1447,7 +1447,7 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
ct_zone=10)
def test_remove_conntrack_entries_for_port_sec_group_change_no_ct_zone(
self):
self):
self._test_remove_conntrack_entries_for_port_sec_group_change(
ct_zone=None)
@ -1497,7 +1497,7 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
'IPv4', direction, ct_zone=10)
def test_remove_conntrack_entries_for_sg_member_changed_ipv4_no_ct_zone(
self):
self):
for direction in ['ingress', 'egress']:
self._test_remove_conntrack_entries_sg_member_changed(
'IPv4', direction, ct_zone=None)
@ -1508,7 +1508,7 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
'IPv6', direction, ct_zone=10)
def test_remove_conntrack_entries_for_sg_member_changed_ipv6_no_ct_zone(
self):
self):
for direction in ['ingress', 'egress']:
self._test_remove_conntrack_entries_sg_member_changed(
'IPv6', direction, ct_zone=None)
@ -2375,7 +2375,7 @@ class OVSHybridIptablesFirewallTestCase(BaseIptablesFirewallTestCase):
# fill it up and then make sure an extra throws an error
for i in range(ip_conntrack.ZONE_START,
ip_conntrack.MAX_CONNTRACK_ZONES):
ip_conntrack.MAX_CONNTRACK_ZONES):
self.firewall.ipconntrack._device_zone_map['dev-%s' % i] = i
with testtools.ExpectedException(exceptions.CTZoneExhaustedError):
self.firewall.ipconntrack._find_open_zone()

View File

@ -149,8 +149,7 @@ class _TestMetadataProxyHandlerCacheMixin(object):
router_id = 'router-id'
ports = [{'network_id': 'network_id1', 'something': 42}]
expected_networks = ('network_id1',)
with mock.patch(
'oslo_utils.timeutils.utcnow_ts', return_value=0):
with mock.patch('oslo_utils.timeutils.utcnow_ts', return_value=0):
mock_get_ports = self.handler.plugin_rpc.get_ports
mock_get_ports.return_value = ports
networks = self.handler._get_router_networks(router_id)

View File

@ -134,8 +134,8 @@ class AgentPluginReportState(base.BaseTestCase):
class AgentRPCMethods(base.BaseTestCase):
def _test_create_consumers(
self, endpoints, method, expected, topics, listen):
def _test_create_consumers(self, endpoints, method, expected, topics,
listen):
with mock.patch.object(n_rpc, 'Connection') as create_connection:
rpc.create_consumers(
endpoints, method, topics, start_listening=listen)

View File

@ -966,8 +966,7 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.assertFalse(self.agent.firewall.defer_apply.called)
class SecurityGroupAgentEnhancedRpcTestCase(
BaseSecurityGroupAgentRpcTestCase):
class SecurityGroupAgentEnhancedRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp(
@ -1089,7 +1088,7 @@ class SecurityGroupAgentEnhancedRpcTestCase(
class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
SecurityGroupAgentRpcTestCase):
SecurityGroupAgentRpcTestCase):
def setUp(self):
super(SecurityGroupAgentRpcWithDeferredRefreshTestCase, self).setUp(
@ -2911,7 +2910,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
class TestSecurityGroupAgentEnhancedRpcWithIptables(
TestSecurityGroupAgentWithIptables):
TestSecurityGroupAgentWithIptables):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)

View File

@ -179,8 +179,9 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
{}, 'foo_network_id')
self.assertEqual(1, self.mock_fanout.call_count)
def _test__notify_agents_with_function(
self, function, expected_scheduling=0, expected_casts=0):
def _test__notify_agents_with_function(self, function,
expected_scheduling=0,
expected_casts=0):
with mock.patch.object(self.notifier, '_schedule_network') as f:
with mock.patch.object(self.notifier, '_get_enabled_agents') as g:
agent = agent_obj.Agent(mock.ANY, id=uuidutils.generate_uuid(),

View File

@ -63,7 +63,7 @@ class ResourceCallbacksManagerTestCaseMixin(object):
class ProducerResourceCallbacksManagerTestCase(
base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
def setUp(self):
super(ProducerResourceCallbacksManagerTestCase, self).setUp()
@ -105,7 +105,7 @@ class ProducerResourceCallbacksManagerTestCase(
class ConsumerResourceCallbacksManagerTestCase(
base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
def setUp(self):
super(ConsumerResourceCallbacksManagerTestCase, self).setUp()
@ -125,8 +125,7 @@ class ConsumerResourceCallbacksManagerTestCase(
self.mgr.register(callback2, 'TYPE')
@mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
def test_get_callbacks_fails_on_unregistered_callback(
self, *mocks):
def test_get_callbacks_fails_on_unregistered_callback(self, *mocks):
self.assertRaises(
rpc_exc.CallbackNotFound,
self.mgr.get_callbacks, 'TYPE')

View File

@ -1806,11 +1806,10 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(
cidr='2607:f0d0:1002:51::/64',
ip_version=constants.IP_VERSION_6,
ipv6_address_mode=constants.IPV6_SLAAC,
gateway_ip=constants.ATTR_NOT_SPECIFIED) as subnet:
with self.subnet(cidr='2607:f0d0:1002:51::/64',
ip_version=constants.IP_VERSION_6,
ipv6_address_mode=constants.IPV6_SLAAC,
gateway_ip=constants.ATTR_NOT_SPECIFIED) as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
ip_address = '2607:f0d0:1002:51::5'
@ -3603,9 +3602,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
cidr='10.0.1.0/24') as subnet2:
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
with self.port(
subnet=subnet1,
fixed_ips=[{'subnet_id': subnet1_id}]):
with self.port(subnet=subnet1,
fixed_ips=[{'subnet_id': subnet1_id}]):
req = self.new_delete_request('subnets', subnet2_id)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code,
@ -4123,7 +4121,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self._test_create_subnet(enable_dhcp=enable_dhcp)
def test_create_subnet_default_gw_conflict_allocation_pool_returns_409(
self):
self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.5'}]
@ -4458,7 +4456,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
ip_version=constants.IP_VERSION_6,
@ -4471,7 +4469,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
ip_version=constants.IP_VERSION_6,
@ -4483,7 +4481,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnet_ipv6_ra_mode_ip_version_4(self):
cidr = '10.0.2.0/24'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
ip_version=constants.IP_VERSION_4,
ipv6_ra_mode=constants.DHCPV6_STATEFUL)
@ -4493,7 +4491,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnet_ipv6_address_mode_ip_version_4(self):
cidr = '10.0.2.0/24'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(
cidr=cidr, ip_version=constants.IP_VERSION_4,
ipv6_address_mode=constants.DHCPV6_STATEFUL)
@ -4506,8 +4504,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
insert_address_allocated=False):
# Create a network with one IPv4 subnet and one port
with self.network() as network,\
self.subnet(network=network) as v4_subnet,\
self.port(subnet=v4_subnet, device_owner=device_owner) as port:
self.subnet(network=network) as v4_subnet,\
self.port(subnet=v4_subnet, device_owner=device_owner) as port:
if insert_db_reference_error:
orig_fn = orm.Session.add
@ -4700,7 +4698,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
with self.network() as network:
with self.subnet(network=network), (
mock.patch.object(self.plugin, 'update_port')) as update_port:
mock.patch.object(self.plugin,
'update_port')) as update_port:
# Create port on second network
network2 = self._make_network(self.fmt, 'net2', True)
@ -5528,7 +5527,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_subnet_lifecycle_dns_retains_order(self):
cfg.CONF.set_override('max_dns_nameservers', 3)
with self.subnet(dns_nameservers=['1.1.1.1', '2.2.2.2',
'3.3.3.3']) as subnet:
'3.3.3.3']) as subnet:
subnets = self._show('subnets', subnet['subnet']['id'],
expected_code=webob.exc.HTTPOk.code)
self.assertEqual(['1.1.1.1', '2.2.2.2', '3.3.3.3'],

View File

@ -234,7 +234,7 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
'fixed_ips': [{'ip_address': '1.1.1.1', 'subnet_id': '4'}]}
with mock.patch.object(l3_obj.FloatingIP, 'objects_exist',
return_value=mock.Mock()),\
testtools.ExpectedException(n_exc.ServicePortInUse):
testtools.ExpectedException(n_exc.ServicePortInUse):
self.db.prevent_l3_port_deletion(ctx, None)
@ -337,14 +337,15 @@ class L3_NAT_db_mixin(base.BaseTestCase):
with mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_create_router_db',
return_value=router_db) as crd,\
mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_make_router_dict',
return_value=router_dict),\
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_update_router_gw_info') as urgi,\
mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_get_router',
return_value=router_db),\
mock.patch.object(l3_db.L3_NAT_db_mixin, 'notify_router_updated')\
as nru:
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_make_router_dict',
return_value=router_dict),\
mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_update_router_gw_info') as urgi,\
mock.patch.object(l3_db.L3_NAT_dbonly_mixin, '_get_router',
return_value=router_db),\
mock.patch.object(l3_db.L3_NAT_db_mixin,
'notify_router_updated') as nru:
self.db.create_router(mock.Mock(), router_input)
self.assertTrue(crd.called)

View File

@ -310,7 +310,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
'foo_host')
def _setup_delete_current_gw_port_deletes_dvr_internal_ports(
self, port=None, gw_port=True, new_network_id='ext_net_id_2'):
self, port=None, gw_port=True, new_network_id='ext_net_id_2'):
router_db = {
'name': 'foo_router',
'admin_state_up': True,
@ -459,8 +459,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.assertIn(const.FLOATINGIP_KEY, router)
self.assertIn(fip, router[const.FLOATINGIP_KEY])
def _setup_test_create_floatingip(
self, fip, floatingip_db, router_db):
def _setup_test_create_floatingip(self, fip, floatingip_db, router_db):
port = {
'id': '1234',
portbindings.HOST_ID: 'myhost',
@ -863,7 +862,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
'router', 'before_update', self.mixin, **kwargs)
def test_validate_add_router_interface_by_subnet_notify_advanced_services(
self):
self):
router = {'name': 'foo_router', 'admin_state_up': False}
router_db = self._create_router(router)
with self.network() as net, \
@ -879,7 +878,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
interface_info=interface_info)
def test_validate_add_router_interface_by_port_notify_advanced_services(
self):
self):
router = {'name': 'foo_router', 'admin_state_up': False}
router_db = self._create_router(router)
with self.network() as net, \
@ -975,8 +974,8 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.ctx, router['id'],
{'router': {'external_gateway_info':
{'network_id': ext_net_id}}})
with mock.patch.object(
self.mixin, '_add_csnat_router_interface_port') as f:
with mock.patch.object(self.mixin,
'_add_csnat_router_interface_port') as f:
f.side_effect = RuntimeError()
self.assertRaises(
l3_exc.RouterInterfaceAttachmentConflict,
@ -1006,8 +1005,8 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
{'router': {'external_gateway_info':
{'network_id': ext_net_id}}})
net_id = subnet['subnet']['network_id']
with mock.patch.object(
router_obj.RouterPort, 'create') as rtrport_update:
with mock.patch.object(router_obj.RouterPort,
'create') as rtrport_update:
rtrport_update.side_effect = Exception()
self.assertRaises(
l3_exc.RouterInterfaceAttachmentConflict,

View File

@ -517,7 +517,7 @@ class TestCli(base.BaseTestCase):
@mock.patch.object(cli, '_compare_labels')
def test__validate_single_revision_labels_branchless_fail_different_labels(
self, compare_mock):
self, compare_mock):
fake_down_revision = FakeRevision()
fake_revision = FakeRevision(down_revision=fake_down_revision)
@ -535,7 +535,7 @@ class TestCli(base.BaseTestCase):
@mock.patch.object(cli, '_compare_labels')
def test__validate_single_revision_labels_branches_fail_different_labels(
self, compare_mock):
self, compare_mock):
fake_down_revision = FakeRevision()
fake_revision = FakeRevision(down_revision=fake_down_revision)
@ -563,7 +563,7 @@ class TestCli(base.BaseTestCase):
@mock.patch.object(cli, '_validate_single_revision_labels')
def test__validate_revision_validates_branchless_migrations(
self, validate_mock):
self, validate_mock):
script_dir = mock.Mock()
fake_revision = FakeRevision()
@ -573,7 +573,7 @@ class TestCli(base.BaseTestCase):
@mock.patch.object(cli, '_validate_revision')
@mock.patch('alembic.script.ScriptDirectory.walk_revisions')
def test_validate_revisions_walks_thru_all_revisions(
self, walk_mock, validate_mock):
self, walk_mock, validate_mock):
revisions = [FakeRevision() for i in range(10)]
walk_mock.return_value = revisions
@ -585,7 +585,7 @@ class TestCli(base.BaseTestCase):
@mock.patch.object(cli, '_validate_revision')
@mock.patch('alembic.script.ScriptDirectory.walk_revisions')
def test_validate_revisions_fails_on_multiple_branch_points(
self, walk_mock, validate_mock):
self, walk_mock, validate_mock):
revisions = [FakeRevision(is_branch_point=True) for i in range(2)]
walk_mock.return_value = revisions

View File

@ -77,7 +77,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
mock_publish.side_effect = exceptions.CallbackFailure(Exception())
secgroup = {'security_group': mock.ANY}
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
securitygroup.SecurityGroupConflict):
self.mixin.create_security_group(self.ctx, secgroup)
def test_delete_security_group_in_use(self):
@ -86,8 +86,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
mock.patch.object(self.mixin, '_get_security_group'),\
mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(
securitygroup.SecurityGroupInUse):
with testtools.ExpectedException(securitygroup.SecurityGroupInUse):
self.mixin.delete_security_group(self.ctx, mock.ANY)
def test_update_security_group_conflict(self):
@ -95,7 +94,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
secgroup = {'security_group': mock.ANY}
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
securitygroup.SecurityGroupConflict):
self.mixin.update_security_group(self.ctx, 'foo_id', secgroup)
def test_create_security_group_rule_conflict(self):
@ -105,7 +104,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
securitygroup.SecurityGroupConflict):
self.mixin.create_security_group_rule(
self.ctx, mock.MagicMock())
@ -182,12 +181,12 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(
securitygroup.SecurityGroupRuleInUse):
securitygroup.SecurityGroupRuleInUse):
self.mixin.delete_security_group_rule(self.ctx, mock.ANY)
def test_delete_security_group_rule_raise_error_on_not_found(self):
with testtools.ExpectedException(
securitygroup.SecurityGroupRuleNotFound):
securitygroup.SecurityGroupRuleNotFound):
self.mixin.delete_security_group_rule(self.ctx, 'foo_rule')
def test_validate_ethertype_and_protocol(self):
@ -220,14 +219,14 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
# test wrong protocols
for rule in fake_ipv4_rules:
with testtools.ExpectedException(
securitygroup.SecurityGroupEthertypeConflictWithProtocol):
securitygroup.SecurityGroupEthertypeConflictWithProtocol):
self.mixin._validate_ethertype_and_protocol(rule)
def test_security_group_precommit_create_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP,
events.PRECOMMIT_CREATE)
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback:
'rollback') as mock_rollback:
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group,
self.ctx, FAKE_SECGROUP)
@ -359,8 +358,8 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group_rule,
self.ctx, fake_rule)
@ -373,8 +372,8 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
sg_rule_dict = self.mixin.create_security_group_rule(self.ctx,
fake_rule)
self.assertRaises(securitygroup.SecurityGroupRuleInUse,
@ -387,7 +386,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(registry, "notify") as mock_notify, \
mock.patch.object(self.mixin, '_get_security_group'):
mock.patch.object(self.mixin, '_get_security_group'):
mock_notify.assert_has_calls([mock.call('security_group_rule',
'precommit_create', mock.ANY, context=mock.ANY,
security_group_rule=self.mixin.create_security_group_rule(
@ -398,7 +397,7 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(registry, "notify") as mock_notify, \
mock.patch.object(self.mixin, '_get_security_group'):
mock.patch.object(self.mixin, '_get_security_group'):
sg_rule_dict = self.mixin.create_security_group_rule(self.ctx,
fake_rule)
self.mixin.delete_security_group_rule(self.ctx,

View File

@ -65,7 +65,7 @@ class DataPlaneStatusExtensionTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
class DataPlaneStatusExtensionTestCase(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self):
plugin = ('neutron.tests.unit.extensions.test_data_plane_status.'

View File

@ -311,47 +311,46 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_no_period(
self):
self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1')
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_dns_domain_period(
self):
self):
cfg.CONF.set_override('dns_domain', 'example.com.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1')
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_pqdn_and_no_dns_domain(
self):
self):
cfg.CONF.set_override('dns_domain', '')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_no_period(
self):
self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.example.com.')
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_and_dns_domain_period(
self):
self):
cfg.CONF.set_override('dns_domain', 'example.com.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.example.com.')
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_fqdn_default_domain_period(
self):
self):
cfg.CONF.set_override('dns_domain', 'openstacklocal.')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(201, res.status_code)
def test_create_port_multiple_v4_v6_subnets_bad_fqdn_and_dns_domain(
self):
def test_create_port_multiple_v4_v6_subnets_bad_fqdn_and_dns_domain(self):
cfg.CONF.set_override('dns_domain', 'example.com')
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name='vm1.bad-domain.com.')
@ -361,8 +360,7 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
'neutron.conf')
self.assertIn(expected_error, res.text)
def test_create_port_multiple_v4_v6_subnets_bad_pqdn_and_dns_domain(
self):
def test_create_port_multiple_v4_v6_subnets_bad_pqdn_and_dns_domain(self):
cfg.CONF.set_override('dns_domain', 'example.com')
num_labels = int(
math.floor(db_const.FQDN_FIELD_SIZE / constants.DNS_LABEL_MAX_LEN))
@ -508,7 +506,7 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
class DnsExtensionTestNetworkDnsDomain(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self):
plugin = ('neutron.tests.unit.extensions.test_dns.' +
'DnsExtensionTestPlugin')

View File

@ -91,7 +91,7 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_external_route(self):
my_tenant = 'tenant1'
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet,\
self.port(subnet=ext_subnet) as nexthop_port:
self.port(subnet=ext_subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]
@ -107,7 +107,7 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_route_via_another_tenant_subnet(self):
my_tenant = 'tenant1'
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet,\
self.port(subnet=subnet) as nexthop_port:
self.port(subnet=subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]

View File

@ -567,9 +567,9 @@ class L3NatTestCaseMixin(object):
floatingip['floatingip']['id'])
@contextlib.contextmanager
def floatingip_no_assoc_with_public_sub(
self, private_sub, fmt=None, set_context=False,
public_sub=None, flavor_id=None, **kwargs):
def floatingip_no_assoc_with_public_sub(self, private_sub, fmt=None,
set_context=False, public_sub=None,
flavor_id=None, **kwargs):
self._set_net_external(public_sub['subnet']['network_id'])
args_list = {}
if flavor_id:
@ -604,8 +604,8 @@ class L3NatTestCaseMixin(object):
set_context=False, flavor_id=None, **kwargs):
with self.subnet(cidr='12.0.0.0/24') as public_sub:
with self.floatingip_no_assoc_with_public_sub(
private_sub, fmt, set_context, public_sub,
flavor_id, **kwargs) as (f, r):
private_sub, fmt, set_context, public_sub,
flavor_id, **kwargs) as (f, r):
# Yield only the floating ip object
yield f
@ -920,9 +920,10 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_update_gateway_add_multiple_prefixes_ipv6(self):
with self.network() as n:
with self.subnet(network=n) as s1, \
self.subnet(network=n, ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/32') \
as s2, (self.router()) as r:
self.subnet(network=n,
ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/32') \
as s2, (self.router()) as r:
self._set_net_external(n['network']['id'])
res1 = self._add_external_gateway_to_router(
r['router']['id'],
@ -1421,8 +1422,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
p['port']['id'])
def test_update_router_interface_port_ip_not_allowed(self):
with self.router() as r, (
self.port()) as p:
with self.router() as r, self.port() as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
@ -1889,7 +1889,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
expected_res_status=exc.HTTPConflict.code)
def test_create_non_router_port_device_id_of_other_tenants_router_update(
self):
self):
# This tests that HTTPConflict is raised if we create a non-router
# port that matches the device_id of another tenants router and then
# we change the device_owner to be network:router_interface.
@ -2294,7 +2294,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
'neutron.db.l3_db.L3_NAT_dbonly_mixin._check_and_get_fip_assoc')
def test_create_floatingip_with_assoc(
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
with self.floatingip_with_assoc() as fip:
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(body['floatingip']['id'],
@ -2341,7 +2341,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
port_body['status'])
def test_floatingip_update(
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
with self.port() as p:
private_sub = {'subnet': {'id':
p['port']['fixed_ips'][0]['subnet_id']}}
@ -2360,7 +2360,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
body['floatingip']['fixed_ip_address'])
def test_floatingip_update_subnet_gateway_disabled(
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
"""Attach a floating IP to an instance
Verify that the floating IP can be associated to a port whose subnet's
@ -2453,8 +2453,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
{'ip_address': str(ip_range[-2])}]
with self.port(subnet=s, fixed_ips=fixed_ips) as p:
with self.floatingip_with_assoc(
port_id=p['port']['id'],
fixed_ip=str(ip_range[-3])) as fip:
port_id=p['port']['id'],
fixed_ip=str(ip_range[-3])) as fip:
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(fip['floatingip']['id'],
body['floatingip']['id'])
@ -2477,7 +2477,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
with self.subnet() as s:
with self.port(subnet=s) as p:
with self.floatingip_with_assoc(
port_id=p['port']['id']) as fip:
port_id=p['port']['id']) as fip:
self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {'port_id': p['port']['id'],
@ -2485,7 +2485,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
expected_code=exc.HTTPBadRequest.code)
def test_floatingip_update_to_same_port_id_twice(
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
self, expected_status=lib_constants.FLOATINGIP_STATUS_ACTIVE):
with self.port() as p:
private_sub = {'subnet': {'id':
p['port']['fixed_ips'][0]['subnet_id']}}
@ -3119,8 +3119,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
# for the router. Otherwise the test would fail earlier without
# reaching the code we want to test. (bug 1556884)
with self.subnet(cidr="10.0.0.0/24") as exs, \
self.subnet(cidr="10.0.1.0/24") as ins1, \
self.subnet(cidr="10.0.2.0/24") as ins2:
self.subnet(cidr="10.0.1.0/24") as ins1, \
self.subnet(cidr="10.0.2.0/24") as ins2:
network_ex_id = exs['subnet']['network_id']
self._set_net_external(network_ex_id)
network_in2_id = ins2['subnet']['network_id']
@ -3180,7 +3180,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
# networking-midonet's L3 service plugin would do.
plugin = directory.get_plugin(plugin_constants.L3)
with mock.patch.object(plugin, "get_router_for_floatingip",
self._get_router_for_floatingip_without_device_owner_check):
self._get_router_for_floatingip_without_device_owner_check):
self._test_floatingip_via_router_interface(exc.HTTPCreated.code)
def test_floatingip_delete_router_intf_with_subnet_id_returns_409(self):

View File

@ -464,9 +464,9 @@ class ExtGwModeIntTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
'external_fixed_ips': [{
'ip_address': mock.ANY,
'subnet_id': s['subnet']['id']}]})]
with self.router(
name=name, admin_state_up=True, tenant_id=tenant_id,
external_gateway_info=input_value) as router:
with self.router(name=name, admin_state_up=True,
tenant_id=tenant_id,
external_gateway_info=input_value) as router:
res = self._show('routers', router['router']['id'])
for k, v in expected_value:
self.assertEqual(res['router'][k], v)

View File

@ -35,9 +35,8 @@ DB_PLUGIN_KLASS = ('neutron.tests.unit.extensions.test_portsecurity.'
'PortSecurityTestPlugin')
class PortSecurityTestCase(
test_securitygroup.SecurityGroupsTestCase,
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
class PortSecurityTestCase(test_securitygroup.SecurityGroupsTestCase,
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self, plugin=None):
self._backup = copy.deepcopy(ext_sg.RESOURCE_ATTRIBUTE_MAP)

View File

@ -112,13 +112,11 @@ class SubnetOnboardTestsBase(object):
with self.subnetpool(self.ip_version,
prefixes=[self.cidr_to_onboard],
address_scope_id=addr_scope['id']) as source:
with self.subnetpool(
self.ip_version,
address_scope_id=addr_scope['id'],
prefixes=self.subnetpool_prefixes) as target:
with self.subnet(
cidr=self.cidr_to_onboard,
ip_version=self.ip_version) as subnet_to_onboard:
with self.subnetpool(self.ip_version,
address_scope_id=addr_scope['id'],
prefixes=self.subnetpool_prefixes) as target:
with self.subnet(cidr=self.cidr_to_onboard,
ip_version=self.ip_version) as subnet_to_onboard:
subnet_to_onboard = subnet_to_onboard['subnet']
# Onboard subnet into an initial subnet pool
@ -139,9 +137,8 @@ class SubnetOnboardTestsBase(object):
prefixes=self.subnetpool_prefixes) as source:
with self.subnetpool(self.ip_version,
prefixes=self.subnetpool_prefixes) as target:
with self.subnet(
cidr=self.cidr_to_onboard,
ip_version=self.ip_version) as subnet_to_onboard:
with self.subnet(cidr=self.cidr_to_onboard,
ip_version=self.ip_version) as subnet_to_onboard:
subnet_to_onboard = subnet_to_onboard['subnet']
# Onboard subnet into an initial subnet pool

View File

@ -166,10 +166,8 @@ class SubnetpoolPrefixOpsTestBase(object):
default_prefixlen=self.default_prefixlen,
min_prefixlen=self.default_prefixlen,
prefixes=self.subnetpool_prefixes) as subnetpool:
with self.subnet(
cidr=None,
subnetpool_id=subnetpool['id'],
ip_version=self.ip_version) as subnet:
with self.subnet(cidr=None, subnetpool_id=subnetpool['id'],
ip_version=self.ip_version) as subnet:
subnet = subnet['subnet']
prefixes_to_remove = [subnet['cidr']]
self.assertRaises(
@ -185,10 +183,8 @@ class SubnetpoolPrefixOpsTestBase(object):
default_prefixlen=self.default_prefixlen,
min_prefixlen=self.default_prefixlen,
prefixes=[self.subnetpool_prefixes[0]]) as subnetpool:
with self.subnet(
cidr=None,
subnetpool_id=subnetpool['id'],
ip_version=self.ip_version) as subnet:
with self.subnet(cidr=None, subnetpool_id=subnetpool['id'],
ip_version=self.ip_version) as subnet:
subnet = subnet['subnet']
prefixes_to_remove = [self.overlapping_cidr]
self.assertRaises(

View File

@ -231,9 +231,9 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
"""
def _create_and_allocate_ipam_subnet(
self, cidr, allocation_pools=constants.ATTR_NOT_SPECIFIED,
ip_version=constants.IP_VERSION_4, v6_auto_address=False,
tenant_id=None):
self, cidr, allocation_pools=constants.ATTR_NOT_SPECIFIED,
ip_version=constants.IP_VERSION_4, v6_auto_address=False,
tenant_id=None):
v6_address_mode = constants.ATTR_NOT_SPECIFIED
if v6_auto_address:
# set ip version to 6 regardless of what's been passed to the

View File

@ -183,9 +183,8 @@ class TestNovaNotify(base.BaseTestCase):
def test_delete_floatingip_deleted_port_no_notify(self):
port_id = 'bee50827-bcee-4cc8-91c1-a27b0ce54222'
with mock.patch.object(
directory.get_plugin(), 'get_port',
side_effect=n_exc.PortNotFound(port_id=port_id)):
with mock.patch.object(directory.get_plugin(), 'get_port',
side_effect=n_exc.PortNotFound(port_id=port_id)):
returned_obj = {'floatingip':
{'port_id': port_id}}
event = self.nova_notifier.create_port_changed_event(

View File

@ -16,14 +16,13 @@ from neutron.tests.unit import testlib_api
class UplinkStatusPropagationIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = uplink_status_propagation.PortUplinkStatusPropagation
class UplinkStatusPropagationDbObjectTestCase(
obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
_test_class = uplink_status_propagation.PortUplinkStatusPropagation

View File

@ -120,9 +120,8 @@ class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
[test_base.get_obj_persistent_fields(obj) for obj in objs])
def test_get_objects_valid_fields(self):
with mock.patch.object(
db_api, 'get_objects',
return_value=[self.db_objs[0]]) as get_objects_mock:
with mock.patch.object(db_api, 'get_objects',
return_value=[self.db_objs[0]]) as get_objects_mock:
objs = self._test_class.get_objects(
self.context,
**self.valid_field_filter)

View File

@ -18,7 +18,7 @@ from neutron.tests.unit import testlib_api
class AutoAllocateTopologyIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = auto_allocate.AutoAllocatedTopology

View File

@ -1208,7 +1208,7 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
'_get_changed_persistent_fields',
return_value=fields_to_update):
with mock.patch.object(obj_db_api, 'get_objects',
side_effect=self.fake_get_objects):
side_effect=self.fake_get_objects):
obj = self._test_class(self.context, **self.obj_fields[0])
# get new values and fix keys
update_mock.return_value = self.db_objs[1]
@ -1251,9 +1251,8 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value=fields_to_update):
with mock.patch.object(
obj_db_api, 'get_objects',
side_effect=self.fake_get_objects):
with mock.patch.object(obj_db_api, 'get_objects',
side_effect=self.fake_get_objects):
obj.update()
self._check_equal(self.objs[0], obj)

View File

@ -19,13 +19,13 @@ from neutron.tests.unit import testlib_api
class FlavorServiceProfileBindingIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = flavor.FlavorServiceProfileBinding
class FlavorServiceProfileBindingDbObjectTestCase(
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
_test_class = flavor.FlavorServiceProfileBinding

View File

@ -32,7 +32,7 @@ class IpamSubnetDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
class IpamAllocationPoolObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = ipam.IpamAllocationPool

View File

@ -18,7 +18,7 @@ from neutron.tests.unit import testlib_api
class L3HARouterAgentPortBindingIfaceObjectTestCase(
base.BaseObjectIfaceTestCase):
base.BaseObjectIfaceTestCase):
_test_class = l3_hamode.L3HARouterAgentPortBinding
@ -58,7 +58,7 @@ class L3HARouterNetworkDbObjectTestCase(base.BaseDbObjectTestCase,
class L3HARouterVRIdAllocationIfaceObjectTestCase(
base.BaseObjectIfaceTestCase):
base.BaseObjectIfaceTestCase):
_test_class = l3_hamode.L3HARouterVRIdAllocation

View File

@ -79,13 +79,13 @@ class NetworkRBACIfaceOjectTestCase(_NetworkRBACBase,
class NetworkDhcpAgentBindingObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkDhcpAgentBinding
class NetworkDhcpAgentBindingDbObjectTestCase(
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
_test_class = network.NetworkDhcpAgentBinding
@ -263,7 +263,7 @@ class NetworkDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
class SegmentHostMappingIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.SegmentHostMapping
@ -297,7 +297,7 @@ class NetworkDNSDomainDbObjectTestcase(obj_test_base.BaseDbObjectTestCase,
class ExternalNetworkIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.ExternalNetwork

View File

@ -16,7 +16,7 @@ from neutron.tests.unit import testlib_api
class ProvisioningBlockIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = provisioning_blocks.ProvisioningBlock

View File

@ -21,8 +21,7 @@ from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class ResourceDeltaObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
class ResourceDeltaObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = quota.ResourceDelta
@ -42,8 +41,7 @@ class ResourceDeltaDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
self._reservation.create()
class ReservationObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
class ReservationObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = quota.Reservation

View File

@ -19,8 +19,7 @@ from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class RouterRouteIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
class RouterRouteIfaceObjectTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = router.RouterRoute

View File

@ -19,13 +19,12 @@ from neutron.tests.unit import testlib_api
class ProviderResourceAssociationIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = servicetype.ProviderResourceAssociation
class ProviderResourceAssociationDbObjectTestCase(
obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
_test_class = servicetype.ProviderResourceAssociation

View File

@ -25,7 +25,7 @@ from neutron.tests.unit import testlib_api
class IPAllocationPoolObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = subnet.IPAllocationPool
@ -106,7 +106,7 @@ class RouteDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
class SubnetServiceTypeObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):
obj_test_base.BaseObjectIfaceTestCase):
_test_class = subnet.SubnetServiceType

View File

@ -64,7 +64,8 @@ class TestCommonAgentLoop(base.BaseTestCase):
manager.get_agent_configurations.return_value = {}
manager.get_rpc_consumers.return_value = []
with mock.patch.object(ca.CommonAgentLoop, '_validate_manager_class'),\
mock.patch.object(ca.CommonAgentLoop, '_validate_rpc_endpoints'):
mock.patch.object(ca.CommonAgentLoop,
'_validate_rpc_endpoints'):
self.agent = ca.CommonAgentLoop(manager, 0, 10, 'fake_agent',
'foo-binary')
with mock.patch.object(self.agent, "daemon_loop"):

View File

@ -22,7 +22,7 @@ from neutron.tests.unit.plugins.ml2.drivers.l2pop.rpc_manager \
class TestL2populationRpcCallBackTunnelMixin(
l2population_rpc_base.TestL2populationRpcCallBackTunnelMixinBase):
l2population_rpc_base.TestL2populationRpcCallBackTunnelMixinBase):
def test_get_agent_ports_no_data(self):
# Make sure vlan manager has no mappings that were added in setUp()
@ -127,8 +127,8 @@ class TestL2populationRpcCallBackTunnelMixin(
sorted(mock_add_fdb_flow.call_args_list))
def test_fdb_remove_tun(self):
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
with mock.patch.object(self.fakeagent,
'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
@ -168,8 +168,8 @@ class TestL2populationRpcCallBackTunnelMixin(
def test_fdb_remove_tun_non_existence_key_in_ofports(self):
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
with mock.patch.object(self.fakeagent,
'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)

View File

@ -200,8 +200,8 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase):
tag=self._dscp_rule_tag(self.port['device'])
)
]
with mock.patch.object(
self.qos_driver, "iptables_manager") as iptables_manager:
with mock.patch.object(self.qos_driver,
"iptables_manager") as iptables_manager:
iptables_manager.ip4['mangle'] = mock.Mock()
iptables_manager.ip6['mangle'] = mock.Mock()
@ -225,8 +225,8 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase):
tag=self._dscp_rule_tag(self.port['device'])
)
]
with mock.patch.object(
self.qos_driver, "iptables_manager") as iptables_manager:
with mock.patch.object(self.qos_driver,
"iptables_manager") as iptables_manager:
iptables_manager.ip4['mangle'] = mock.Mock()
iptables_manager.ip6['mangle'] = mock.Mock()
@ -243,8 +243,8 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase):
mock.call.remove_chain(
dscp_chain_name),
]
with mock.patch.object(
self.qos_driver, "iptables_manager") as iptables_manager:
with mock.patch.object(self.qos_driver,
"iptables_manager") as iptables_manager:
iptables_manager.ip4['mangle'] = mock.Mock()
iptables_manager.ip6['mangle'] = mock.Mock()
@ -265,8 +265,8 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase):
mock.call.clear_rules_by_tag(
self._dscp_rule_tag(self.port['device'])),
]
with mock.patch.object(
self.qos_driver, "iptables_manager") as iptables_manager:
with mock.patch.object(self.qos_driver,
"iptables_manager") as iptables_manager:
iptables_manager.ip4['mangle'] = mock.Mock()
iptables_manager.ip6['mangle'] = mock.Mock()

View File

@ -450,15 +450,14 @@ class TestLinuxBridgeManager(base.BaseTestCase):
with mock.patch.object(ip_lib, 'device_exists', return_value=False):
vxlan_dev = mock.Mock()
with mock.patch.object(vxlan_dev, 'disable_ipv6') as dv6_fn,\
mock.patch.object(self.lbm.ip, 'add_vxlan',
return_value=vxlan_dev) as add_vxlan_fn,\
mock.patch.object(
vxlan_dev.link, 'set_mtu',
side_effect=ip_lib.InvalidArgument(
parameter="MTU", value=mtu)),\
mock.patch.object(ip_lib, 'get_device_mtu',
return_value=physical_mtu),\
mock.patch.object(vxlan_dev.link, 'delete') as delete_dev:
mock.patch.object(self.lbm.ip, 'add_vxlan',
return_value=vxlan_dev) as add_vxlan_fn,\
mock.patch.object(vxlan_dev.link, 'set_mtu',
side_effect=ip_lib.InvalidArgument(
parameter="MTU", value=mtu)),\
mock.patch.object(ip_lib, 'get_device_mtu',
return_value=physical_mtu),\
mock.patch.object(vxlan_dev.link, 'delete') as delete_dev:
self.assertFalse(
self.lbm.ensure_vxlan(seg_id, mtu=mtu))

View File

@ -80,7 +80,7 @@ class TestMacvtapManager(base.BaseTestCase):
good_mapping = {'physnet1': 'eth1', 'physnet2': 'eth2'}
self.mgr.interface_mappings = good_mapping
with mock.patch.object(ip_lib, 'device_exists', return_value=True)\
as mock_de:
as mock_de:
self.mgr.validate_interface_mappings()
mock_de.assert_any_call('eth1')
mock_de.assert_any_call('eth2')
@ -90,7 +90,7 @@ class TestMacvtapManager(base.BaseTestCase):
bad_mapping = {'physnet1': 'foo'}
self.mgr.interface_mappings = bad_mapping
with mock.patch.object(ip_lib, 'device_exists', return_value=False)\
as mock_de, mock.patch.object(sys, 'exit') as mock_exit:
as mock_de, mock.patch.object(sys, 'exit') as mock_exit:
self.mgr.validate_interface_mappings()
mock_de.assert_called_with('foo')
mock_exit.assert_called_once_with(1)
@ -116,9 +116,9 @@ class TestMacvtapManager(base.BaseTestCase):
listing = ['foo', 'macvtap0', 'macvtap1', 'bar']
# set some mac mappings to make sure they are cleaned up
self.mgr.mac_device_name_mappings = {'foo': 'bar'}
with mock.patch.object(os, 'listdir', return_value=listing)\
as mock_ld,\
mock.patch.object(ip_lib, 'get_device_mac') as mock_gdn:
with mock.patch.object(os, 'listdir',
return_value=listing) as mock_ld,\
mock.patch.object(ip_lib, 'get_device_mac') as mock_gdn:
mock_gdn.side_effect = ['mac0', 'mac1']
result = self.mgr.get_all_devices()
@ -143,7 +143,7 @@ class TestMacvtapManager(base.BaseTestCase):
mock_devices = []
with mock.patch.object(ip_lib.IPWrapper, 'get_devices',
return_value=mock_devices),\
mock.patch.object(sys, 'exit') as mock_exit:
mock.patch.object(sys, 'exit') as mock_exit:
self.mgr.get_agent_id()
mock_exit.assert_called_once_with(1)
@ -177,7 +177,7 @@ class TestMacvtapManager(base.BaseTestCase):
def test_plug_interface(self):
self.mgr.mac_device_name_mappings['mac1'] = 'macvtap0'
with mock.patch.object(ip_lib.IpLinkCommand, 'set_allmulticast_on')\
as mock_sao:
as mock_sao:
self.mgr.plug_interface('network_id', 'network_segment', 'mac1',
'device_owner')
self.assertTrue(mock_sao.called)
@ -197,7 +197,7 @@ class TestMacvtapMain(base.BaseTestCase):
'macvtap')
with mock.patch.object(helpers, 'parse_mappings',
side_effect=ValueError('bad mapping')),\
mock.patch.object(sys, 'exit') as mock_exit:
mock.patch.object(sys, 'exit') as mock_exit:
macvtap_neutron_agent.parse_interface_mappings()
mock_exit.assert_called_with(1)

View File

@ -707,8 +707,7 @@ class TestPciOsWrapper(base.BaseTestCase):
@mock.patch("os.listdir", return_value=["eth0", "eth1"])
@mock.patch("neutron.plugins.ml2.drivers.mech_sriov.agent.pci_lib."
"PciDeviceIPWrapper.is_macvtap_assigned", return_value=True)
def test_is_assigned_vf_macvtap(
self, mock_is_macvtap_assigned, *args):
def test_is_assigned_vf_macvtap(self, mock_is_macvtap_assigned, *args):
esm.PciOsWrapper.is_assigned_vf(self.DEV_NAME, self.VF_INDEX, '')
mock_is_macvtap_assigned.called_with(self.VF_INDEX, "eth0")
@ -716,8 +715,8 @@ class TestPciOsWrapper(base.BaseTestCase):
@mock.patch("os.listdir", side_effect=OSError())
@mock.patch("neutron.plugins.ml2.drivers.mech_sriov.agent.pci_lib."
"PciDeviceIPWrapper.is_macvtap_assigned")
def test_is_assigned_vf_macvtap_failure(
self, mock_is_macvtap_assigned, *args):
def test_is_assigned_vf_macvtap_failure(self, mock_is_macvtap_assigned,
*args):
esm.PciOsWrapper.is_assigned_vf(self.DEV_NAME, self.VF_INDEX, '')
self.assertFalse(mock_is_macvtap_assigned.called)

View File

@ -867,7 +867,7 @@ class TestOvsNeutronAgent(object):
details, mock.Mock(), 'treat_vif_port'))
def test_treat_devices_added_updated_sends_vif_port_into_extension_manager(
self, *args):
self, *args):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
port = mock.MagicMock()
@ -2206,8 +2206,8 @@ class TestOvsNeutronAgent(object):
self.assertFalse(cleanup.called)
def test_set_rpc_timeout(self):
with mock.patch.object(
n_rpc.BackingOffClient, 'set_max_timeout') as smt:
with mock.patch.object(n_rpc.BackingOffClient,
'set_max_timeout') as smt:
self.agent._handle_sigterm(None, None)
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,

View File

@ -110,7 +110,7 @@ class OpenvswitchMechanismBaseTestCase(base.AgentMechanismBaseTestCase):
class OpenvswitchMechanismSGDisabledBaseTestCase(
OpenvswitchMechanismBaseTestCase):
OpenvswitchMechanismBaseTestCase):
VIF_DETAILS = {'bridge_name': 'br-int',
portbindings.OVS_DATAPATH_TYPE: 'system',
portbindings.CAP_PORT_FILTER: False,
@ -214,13 +214,13 @@ class OpenvswitchMechanismGreTestCase(OpenvswitchMechanismBaseTestCase,
class OpenvswitchMechanismSGDisabledLocalTestCase(
OpenvswitchMechanismSGDisabledBaseTestCase,
base.AgentMechanismLocalTestCase):
OpenvswitchMechanismSGDisabledBaseTestCase,
base.AgentMechanismLocalTestCase):
pass
class OpenvswitchMechanismFirewallUndefinedTestCase(
OpenvswitchMechanismBaseTestCase, base.AgentMechanismLocalTestCase):
OpenvswitchMechanismBaseTestCase, base.AgentMechanismLocalTestCase):
def setUp(self):
# this simple test case just ensures backward compatibility where

View File

@ -420,7 +420,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
original_ips=original_ips)
def _assert_update_fixed_ips_no_effect_after_clearing_dns_attribute(
self, dns_data_db, dns_data_db_1, dns_data_db_2):
self, dns_data_db, dns_data_db_1, dns_data_db_2):
self.assertEqual('', dns_data_db_2['current_dns_name'])
self.assertEqual('', dns_data_db_2['current_dns_domain'])
self.assertEqual(dns_data_db_1['current_dns_name'],

View File

@ -18,19 +18,19 @@ from neutron.tests.unit.plugins.ml2 import test_plugin
class Ml2AgentSchedulerTestCase(
test_agentschedulers_db.OvsAgentSchedulerTestCase):
test_agentschedulers_db.OvsAgentSchedulerTestCase):
plugin_str = test_plugin.PLUGIN_NAME
l3_plugin = ('neutron.services.l3_router.'
'l3_router_plugin.L3RouterPlugin')
class Ml2L3AgentNotifierTestCase(
test_agentschedulers_db.OvsL3AgentNotifierTestCase):
test_agentschedulers_db.OvsL3AgentNotifierTestCase):
plugin_str = test_plugin.PLUGIN_NAME
l3_plugin = ('neutron.services.l3_router.'
'l3_router_plugin.L3RouterPlugin')
class Ml2DhcpAgentNotifierTestCase(
test_agentschedulers_db.OvsDhcpAgentNotifierTestCase):
test_agentschedulers_db.OvsDhcpAgentNotifierTestCase):
plugin_str = test_plugin.PLUGIN_NAME

View File

@ -591,7 +591,7 @@ class TestMl2NetworksWithVlanTransparencyBase(TestMl2NetworksV2):
class TestMl2NetworksWithVlanTransparency(
TestMl2NetworksWithVlanTransparencyBase):
TestMl2NetworksWithVlanTransparencyBase):
_mechanism_drivers = ['test']
def test_create_network_vlan_transparent_fail(self):
@ -617,7 +617,7 @@ class TestMl2NetworksWithVlanTransparency(
class TestMl2NetworksWithVlanTransparencyAndMTU(
TestMl2NetworksWithVlanTransparencyBase):
TestMl2NetworksWithVlanTransparencyBase):
_mechanism_drivers = ['test']
def test_create_network_vlan_transparent_and_mtu(self):
@ -2414,8 +2414,8 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
'id': 'foo_port_id',
portbindings.HOST_ID: 'foo_host',
}
with mock.patch.object(
ml2_db, 'ensure_distributed_port_binding') as mock_dist:
with mock.patch.object(ml2_db,
'ensure_distributed_port_binding') as mock_dist:
plugin.update_distributed_port_binding(
self.context, 'foo_port_id', {'port': port})
self.assertFalse(mock_dist.called)
@ -3288,8 +3288,7 @@ class TestML2Segments(Ml2PluginV2TestCase):
def test_create_network_mtu_on_precommit(self):
with mock.patch.object(mech_test.TestMechanismDriver,
'create_network_precommit') as bmp:
with mock.patch.object(
self.driver, '_get_network_mtu') as mtu:
with mock.patch.object(self.driver, '_get_network_mtu') as mtu:
mtu.return_value = 1100
with self.network() as network:
self.assertIn('mtu', network['network'])
@ -3310,8 +3309,7 @@ class TestML2Segments(Ml2PluginV2TestCase):
def test_reserve_segment_update_network_mtu(self):
with self.network() as network:
network_id = network['network']['id']
with mock.patch.object(
self.driver, '_get_network_mtu') as mtu:
with mock.patch.object(self.driver, '_get_network_mtu') as mtu:
mtu.return_value = 100
self._reserve_segment(network)
updated_network = self.driver.get_network(self.context,

View File

@ -62,7 +62,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
}
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'),\
mock.patch.object(self.callbacks, 'notify_l2pop_port_wiring'):
mock.patch.object(self.callbacks, 'notify_l2pop_port_wiring'):
with mock.patch('neutron.db.provisioning_blocks.'
'provisioning_complete') as pc:
self.callbacks.update_device_up(mock.Mock(), **kwargs)

View File

@ -163,7 +163,6 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
self.assertFalse(self.was_active)
class TestMl2SGServerRpcCallBack(
Ml2SecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackTestCase):
class TestMl2SGServerRpcCallBack(Ml2SecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackTestCase):
pass

View File

@ -525,9 +525,8 @@ class TestNetworksFailover(TestDhcpSchedulerBaseTestCase,
def test_reschedule_network_from_down_agent_failed_on_unexpected(self):
agents = self._create_and_set_agents_down(['host-a'], 1)
self._test_schedule_bind_network([agents[0]], self.network_id)
with mock.patch.object(
self, '_filter_bindings',
side_effect=Exception()):
with mock.patch.object(self, '_filter_bindings',
side_effect=Exception()):
# just make sure that no exception is raised
self.remove_networks_from_down_agents()
@ -542,8 +541,7 @@ class TestNetworksFailover(TestDhcpSchedulerBaseTestCase,
def test_reschedule_doesnt_occur_if_no_agents(self):
agents = self._create_and_set_agents_down(['host-a', 'host-b'], 2)
self._test_schedule_bind_network([agents[0]], self.network_id)
with mock.patch.object(
self, 'remove_network_from_dhcp_agent') as rn:
with mock.patch.object(self, 'remove_network_from_dhcp_agent') as rn:
self.remove_networks_from_down_agents()
self.assertFalse(rn.called)

View File

@ -1192,7 +1192,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
1,
l3plugin.delete_arp_entry_for_dvr_service_port.call_count)
if fip and is_distributed and not (routers_to_remove and
fip['router_id'] is routers_to_remove[0]['router_id']):
fip['router_id'] is routers_to_remove[0]['router_id']):
(l3plugin.l3_rpc_notifier.routers_updated_on_host.
assert_called_once_with(mock.ANY, ['router_id'], source_host))
self.assertEqual(

View File

@ -101,8 +101,8 @@ class TestDriverController(testlib_api.SqlTestCase):
with mock.patch.object(registry, "publish") as mock_cb:
with mock.patch.object(test_dc, "get_provider_for_router"):
with mock.patch.object(
driver_controller,
"_ensure_driver_supports_request") as _ensure:
driver_controller,
"_ensure_driver_supports_request") as _ensure:
_ensure.side_effect = lib_exc.InvalidInput(
error_message='message')
self.assertRaises(
@ -119,8 +119,8 @@ class TestDriverController(testlib_api.SqlTestCase):
with mock.patch.object(registry, "publish"):
with mock.patch.object(test_dc, "get_provider_for_router"):
with mock.patch.object(
driver_controller,
"_ensure_driver_supports_request") as _ensure:
driver_controller,
"_ensure_driver_supports_request") as _ensure:
_ensure.side_effect = lib_exc.InvalidInput(
error_message='message')
with mock.patch(

View File

@ -23,8 +23,8 @@ class BaseLogTestCase(testlib_api.SqlTestCase):
super(BaseLogTestCase, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()

View File

@ -48,8 +48,8 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
super(TestPortForwardingPlugin, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()

View File

@ -23,8 +23,8 @@ class BaseQosTestCase(testlib_api.SqlTestCase):
super(BaseQosTestCase, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()

View File

@ -230,7 +230,7 @@ class TestOVSDBHandler(base.BaseTestCase):
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
def test_unwire_subports_for_trunk_trunk_manager_failure(self, br):
with mock.patch.object(
self.ovsdb_handler, '_update_trunk_metadata') as f:
self.ovsdb_handler, '_update_trunk_metadata') as f:
self.trunk_manager.remove_sub_port.side_effect = (
trunk_manager.TrunkManagerError(error='error'))
status = self.ovsdb_handler.unwire_subports_for_trunk(

View File

@ -105,7 +105,7 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
trunk_port_validator = rules.TrunkPortValidator(trunk['port_id'])
if not trunk_port_validator.can_be_trunked_or_untrunked(
self.context):
self.context):
self.assertRaises(trunk_exc.TrunkInUse,
self.trunk_plugin.delete_trunk,
self.context, trunk['id'])

View File

@ -309,7 +309,7 @@ class RequestDeserializerTest(testtools.TestCase):
def test_deserialize(self):
"""Test RequestDeserializer.deserialize."""
with mock.patch.object(
self.deserializer, 'get_action_args') as mock_method:
self.deserializer, 'get_action_args') as mock_method:
mock_method.return_value = {'action': 'create'}
request = wsgi.Request.blank('/')
request.headers['Accept'] = 'application/json'

View File

@ -56,9 +56,8 @@ class PortAllocationTestCase(base.DietTestCase):
for p in range(1024, 32767):
ss_output2 += ss_output_template % p
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') \
as ipwrapper, \
mock.patch('neutron.agent.linux.utils.execute') as ex:
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ipwrapper, \
mock.patch('neutron.agent.linux.utils.execute') as ex:
m = mock.MagicMock()
m.netns.execute.return_value = ss_output2
ipwrapper.return_value = m

View File

@ -156,7 +156,6 @@ deps = -r{toxinidir}/doc/requirements.txt
commands = sphinx-build -W -b linkcheck doc/source doc/build/linkcheck
[flake8]
# E125 continuation line does not distinguish itself from next logical line
# E126 continuation line over-indented for hanging indent
# E128 continuation line under-indented for visual indent
# H405 multi line docstring summary not separated with an empty line
@ -164,7 +163,7 @@ commands = sphinx-build -W -b linkcheck doc/source doc/build/linkcheck
# TODO(amotoki) check the following new rules should be fixed or ignored
# E731 do not assign a lambda expression, use a def
# W504 line break after binary operator
ignore = E125,E126,E128,E731,H405,N530,W504
ignore = E126,E128,E731,H405,N530,W504
# H106: Don't put vim configuration in source files
# H203: Use assertIs(Not)None to check for None
# H204: Use assert(Not)Equal to check for equality