pyupgrade changes for Python3.9+

As discussed at the Epoxy PTG meeting, run an automated
upgrade tool to make code python 3.9+ compliant.

Result of running:

$ pyupgrade --py39-plus $(git ls-files | grep ".py$")

Fixed PEP8 errors introduced by pyupgrade by running:

$ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
  --in-place neutron_fwaas

Also did manual updates as necessary to fix other errors
and warnings after above commands.

Inspired by Octavia and Nova [0].

[0] https://review.opendev.org/c/openstack/nova/+/896986

Change-Id: I28003aa9e42479ac966a05edd174fc0435e57e3a
This commit is contained in:
elajkat 2024-11-06 13:24:42 +01:00 committed by Lajos Katona
parent bef88ac0eb
commit 070113c906
66 changed files with 393 additions and 370 deletions

View File

@ -43,7 +43,6 @@ disable=
global-variable-not-assigned,
logging-not-lazy,
no-init,
non-parent-init-called,
pointless-string-statement,
protected-access,
redefined-builtin,

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2010 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -82,8 +82,8 @@ class FirewallRouterAssociation(model_base.BASEV2):
__tablename__ = 'firewall_router_associations'
fw_id = sa.Column(sa.String(36),
sa.ForeignKey('firewalls.id', ondelete="CASCADE"),
primary_key=True)
sa.ForeignKey('firewalls.id', ondelete="CASCADE"),
primary_key=True)
router_id = sa.Column(sa.String(36),
sa.ForeignKey('routers.id', ondelete="CASCADE"),
primary_key=True)
sa.ForeignKey('routers.id', ondelete="CASCADE"),
primary_key=True)

View File

@ -59,11 +59,11 @@ class FirewallDefaultObjectUpdateRestricted(FirewallDefaultParameterExists):
"'%(resource_id)s' of type %(resource_type)s.")
class HasName(object):
class HasName:
name = sa.Column(sa.String(db_constants.NAME_FIELD_SIZE))
class HasDescription(object):
class HasDescription:
description = sa.Column(
sa.String(db_constants.LONG_DESCRIPTION_FIELD_SIZE))
@ -188,7 +188,7 @@ def _list_firewall_policies_result_filter_hook(query, filters):
return query
class FirewallPluginDb(object):
class FirewallPluginDb:
def __new__(cls, *args, **kwargs):
model_query.register_hook(
@ -204,7 +204,7 @@ class FirewallPluginDb(object):
query_hook=None,
filter_hook=None,
result_filters=_list_firewall_policies_result_filter_hook)
return super(FirewallPluginDb, cls).__new__(cls, *args, **kwargs)
return super().__new__(cls, *args, **kwargs)
def _get_firewall_group(self, context, id):
try:
@ -254,7 +254,7 @@ class FirewallPluginDb(object):
def _validate_fwr_port_range(self, min_port, max_port):
if int(min_port) > int(max_port):
port_range = '%s:%s' % (min_port, max_port)
port_range = '{}:{}'.format(min_port, max_port)
raise f_exc.FirewallRuleInvalidPortValue(port=port_range)
def _get_min_max_ports_from_range(self, port_range):
@ -272,7 +272,7 @@ class FirewallPluginDb(object):
if min_port == max_port:
return str(min_port)
self._validate_fwr_port_range(min_port, max_port)
return '%s:%s' % (min_port, max_port)
return '{}:{}'.format(min_port, max_port)
def _make_firewall_rule_dict(self, firewall_rule, fields=None,
policies=None):
@ -656,7 +656,7 @@ class FirewallPluginDb(object):
rule_id_list, filters):
rules_in_fwr_db = model_query.get_collection_query(
context, FirewallRuleV2, filters=filters)
rules_dict = dict((fwr_db['id'], fwr_db) for fwr_db in rules_in_fwr_db)
rules_dict = {fwr_db['id']: fwr_db for fwr_db in rules_in_fwr_db}
for fwrule_id in rule_id_list:
if fwrule_id not in rules_dict:
# Bail as soon as we find an invalid rule.
@ -710,7 +710,7 @@ class FirewallPluginDb(object):
with db_api.CONTEXT_READER.using(context):
fwg_with_fwp_id_db = context.session.query(FirewallGroup).filter(
or_(FirewallGroup.ingress_firewall_policy_id == fwp_id,
FirewallGroup.egress_firewall_policy_id == fwp_id))
FirewallGroup.egress_firewall_policy_id == fwp_id))
for entry in fwg_with_fwp_id_db:
if entry.tenant_id != fwp_tenant_id:
raise f_exc.FirewallPolicyInUse(
@ -742,7 +742,7 @@ class FirewallPluginDb(object):
filters = {'firewall_rule_id': [r_id for r_id in rule_id_list]}
# Run a validation on the Firewall Rules table
self._check_rules_for_policy_is_valid(context, fwp, fwp_db,
rule_id_list, filters)
rule_id_list, filters)
# new rules are valid, lets delete the old association
self._delete_all_rules_from_policy(context, fwp_db)
# and add in the new association
@ -754,8 +754,8 @@ class FirewallPluginDb(object):
context,
FirewallPolicyRuleAssociation,
filters=filters)
rules_dict = dict((fpol_rul_db['firewall_rule_id'], fpol_rul_db)
for fpol_rul_db in rules_in_fpol_rul_db)
rules_dict = {fpol_rul_db['firewall_rule_id']: fpol_rul_db
for fpol_rul_db in rules_in_fpol_rul_db}
fwp_db.rule_associations = []
for fwrule_id in rule_id_list:
fwp_db.rule_associations.append(rules_dict[fwrule_id])
@ -800,7 +800,7 @@ class FirewallPluginDb(object):
with db_api.CONTEXT_WRITER.using(context):
fwp_db = self._get_firewall_policy(context, id)
self._ensure_not_default_resource(fwp_db, 'firewall_policy',
action="update")
action="update")
if not fwp.get('shared', True):
# an update is setting shared to False, make sure associated
# firewall groups are in the same project.

View File

@ -41,14 +41,15 @@ SQL_STATEMENT = (
def upgrade():
op.create_table('firewall_router_associations',
sa.Column('fw_id', sa.String(length=36), nullable=False),
sa.Column('router_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('fw_id', 'router_id'),
)
sa.Column('fw_id', sa.String(length=36), nullable=False),
sa.Column('router_id', sa.String(
length=36), nullable=False),
sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('fw_id', 'router_id'),
)
# Depending on when neutron-fwaas is installed with neutron, this script
# may be run before or after the neutron core tables have had their

View File

@ -32,14 +32,16 @@ down_revision = '540142f314f4'
def upgrade(active_plugins=None, options=None):
op.create_table('cisco_firewall_associations',
sa.Column('fw_id', sa.String(length=36), nullable=False),
sa.Column('port_id', sa.String(length=36), nullable=True),
sa.Column('direction', sa.String(length=16), nullable=True),
sa.Column('acl_id', sa.String(length=36), nullable=True),
sa.Column('router_id', sa.String(length=36), nullable=True),
sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('fw_id')
)
sa.Column('fw_id', sa.String(length=36), nullable=False),
sa.Column('port_id', sa.String(length=36), nullable=True),
sa.Column('direction', sa.String(
length=16), nullable=True),
sa.Column('acl_id', sa.String(length=36), nullable=True),
sa.Column('router_id', sa.String(
length=36), nullable=True),
sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('fw_id')
)

View File

@ -171,7 +171,7 @@ def decode(nfa):
'msg': encodeutils.safe_decode(msg)}
class NFLogWrapper(object):
class NFLogWrapper:
"""A wrapper for libnetfilter_log api"""
_instance = None
@ -304,7 +304,7 @@ def run_nflog(namespace=None, group=0):
return pid
class NFLogApp(object):
class NFLogApp:
"""Log application for handling nflog packets"""
callback = None

View File

@ -123,7 +123,7 @@ class ConntrackOpenFailedExit(SystemExit):
"""Raised if we fail to open a new conntrack or conntrack handler"""
class ConntrackManager(object):
class ConntrackManager:
def __init__(self, family_socket=None):
self.family_socket = family_socket
self.set_functions = {
@ -147,14 +147,14 @@ class ConntrackManager(object):
6: nfct.nfct_set_attr_u16}, }
self.converters = {'src': bytes,
'dst': bytes,
'ipversion': nl_constants.IPVERSION_SOCKET.get,
'protocol': constants.IP_PROTOCOL_MAP.get,
'code': int,
'type': int,
'id': libc.htons,
'sport': libc.htons,
'dport': libc.htons, }
'dst': bytes,
'ipversion': nl_constants.IPVERSION_SOCKET.get,
'protocol': constants.IP_PROTOCOL_MAP.get,
'code': int,
'type': int,
'id': libc.htons,
'sport': libc.htons,
'dport': libc.htons, }
def list_entries(self):
entries = []

View File

@ -48,7 +48,7 @@ class FirewallPluginV2(Firewallv2PluginBase):
path_prefix = firewall_v2.API_PREFIX
def __init__(self):
super(FirewallPluginV2, self).__init__()
super().__init__()
"""Do the initialization for the firewall service plugin here."""
# Initialize the Firewall v2 service plugin
service_type_manager = st_db.ServiceTypeManager.get_instance()

View File

@ -31,7 +31,7 @@ from neutron_fwaas.services.firewall.service_drivers import driver_api
LOG = logging.getLogger(__name__)
class FirewallAgentCallbacks(object):
class FirewallAgentCallbacks:
target = oslo_messaging.Target(version='1.0')
def __init__(self, firewall_db):
@ -106,7 +106,7 @@ class FirewallAgentCallbacks(object):
"""Get all projects that have firewall_groups."""
ctx = neutron_context.get_admin_context()
fwg_list = self.firewall_db.get_firewall_groups(ctx)
fwg_project_list = list(set(fwg['tenant_id'] for fwg in fwg_list))
fwg_project_list = list({fwg['tenant_id'] for fwg in fwg_list})
return fwg_project_list
@log_helpers.log_method_call
@ -136,7 +136,7 @@ class FirewallAgentCallbacks(object):
return fwg
class FirewallAgentApi(object):
class FirewallAgentApi:
"""Plugin side of plugin to agent RPC API"""
def __init__(self, topic, host):
@ -169,7 +169,7 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB,
"""
def __init__(self, service_plugin):
super(FirewallAgentDriver, self).__init__(service_plugin)
super().__init__(service_plugin)
self.agent_rpc = FirewallAgentApi(constants.FW_AGENT, cfg.CONF.host)
def is_supported_l2_port(self, port):
@ -250,7 +250,7 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB,
ports = firewall_group['ports']
if (not ports or (not firewall_group['ingress_firewall_policy_id'] and
not firewall_group['egress_firewall_policy_id'])):
not firewall_group['egress_firewall_policy_id'])):
# no messaging to agent needed and fw needs to go to INACTIVE state
# as no associated ports and/or no policy configured.
status = nl_constants.INACTIVE
@ -317,9 +317,9 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB,
not new_firewall_group['ports'])
LOG.debug("update_firewall_group %s: Add Ports: %s, Del Ports: %s",
new_firewall_group['id'],
fwg_with_rules['add-port-ids'],
fwg_with_rules['del-port-ids'])
new_firewall_group['id'],
fwg_with_rules['add-port-ids'],
fwg_with_rules['del-port-ids'])
fwg_with_rules['port_details'] = self._get_fwg_port_details(
context, fwg_with_rules['del-port-ids'])

View File

@ -37,7 +37,7 @@ def load_and_init_conntrack_driver(*args, **kwargs):
return conntrack_driver
class ConntrackDriverBase(object, metaclass=abc.ABCMeta):
class ConntrackDriverBase(metaclass=abc.ABCMeta):
"""Base Driver for Conntrack"""
@abc.abstractmethod

View File

@ -16,7 +16,7 @@
import abc
class FwaasDriverBase(object, metaclass=abc.ABCMeta):
class FwaasDriverBase(metaclass=abc.ABCMeta):
"""Firewall as a Service Driver base class.
Using FwaasDriver Class, an instance of L3 perimeter Firewall

View File

@ -16,7 +16,7 @@
import abc
class FwaasDriverBase(object, metaclass=abc.ABCMeta):
class FwaasDriverBase(metaclass=abc.ABCMeta):
"""Firewall as a Service Driver base class.
Using FwaasDriver Class, an instance of L3 perimeter Firewall

View File

@ -69,7 +69,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
self.conntrack = conntrack_base.load_and_init_conntrack_driver()
def _get_intf_name(self, if_prefix, port_id):
_name = "%s%s" % (if_prefix, port_id)
_name = "{}{}".format(if_prefix, port_id)
return _name[:MAX_INTF_NAME_LEN]
def create_firewall_group(self, agent_mode, apply_list, firewall):
@ -140,11 +140,12 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
if firewall['admin_state_up']:
self._setup_firewall(agent_mode, apply_list, firewall)
if self.pre_firewall:
self._remove_conntrack_updated_firewall(agent_mode,
apply_list, self.pre_firewall, firewall)
self._remove_conntrack_updated_firewall(
agent_mode,
apply_list, self.pre_firewall, firewall)
else:
self._remove_conntrack_new_firewall(agent_mode,
apply_list, firewall)
apply_list, firewall)
else:
self.apply_default_policy(agent_mode, apply_list, firewall)
self.pre_firewall = dict(firewall)
@ -210,9 +211,9 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
ipt_mgr.defer_apply_off()
def _get_chain_name(self, fwid, ver, direction):
return '%s%s%s' % (CHAIN_NAME_PREFIX[direction],
IP_VER_TAG[ver],
fwid)
return '{}{}{}'.format(CHAIN_NAME_PREFIX[direction],
IP_VER_TAG[ver],
fwid)
def _setup_chains(self, firewall, ipt_if_prefix, router_fw_ports):
"""Create Fwaas chain using the rules in the policy
@ -293,7 +294,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
fw_rules = firewall[fw_rule_list]
fw_rule_ids = [fw_rule['id'] for fw_rule in fw_rules]
removed_rules.extend([pre_fw_rule for pre_fw_rule in pre_fw_rules
if pre_fw_rule['id'] not in fw_rule_ids])
if pre_fw_rule['id'] not in fw_rule_ids])
return removed_rules
def _find_new_rules(self, pre_firewall, firewall):
@ -301,7 +302,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
def _remove_conntrack_new_firewall(self, agent_mode, apply_list, firewall):
"""Remove conntrack when create new firewall"""
routers_list = list(set([apply_info[0] for apply_info in apply_list]))
routers_list = list({apply_info[0] for apply_info in apply_list})
for ri in routers_list:
ipt_if_prefix_list = self._get_ipt_mgrs_with_if_prefix(
agent_mode, ri)
@ -312,7 +313,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
def _remove_conntrack_updated_firewall(self, agent_mode,
apply_list, pre_firewall, firewall):
"""Remove conntrack when updated firewall"""
routers_list = list(set([apply_info[0] for apply_info in apply_list]))
routers_list = list({apply_info[0] for apply_info in apply_list})
for ri in routers_list:
ipt_if_prefix_list = self._get_ipt_mgrs_with_if_prefix(
agent_mode, ri)
@ -418,7 +419,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
def _get_action_chain(self, name):
binary_name = iptables_manager.binary_name
chain_name = iptables_manager.get_chain_name(name)
return '%s-%s' % (binary_name, chain_name)
return '{}-{}'.format(binary_name, chain_name)
def _enable_policy_chain(self, fwid, ipt_if_prefix, router_fw_ports):
bname = iptables_manager.binary_name
@ -435,18 +436,18 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
for router_fw_port in router_fw_ports:
intf_name = self._get_intf_name(if_prefix,
router_fw_port)
jump_rule = ['%s %s -j %s-%s' % (
jump_rule = ['{} {} -j {}-{}'.format(
IPTABLES_DIR[direction], intf_name,
bname, chain_name)]
self._add_rules_to_chain(ipt_mgr, ver,
'FORWARD', jump_rule)
'FORWARD', jump_rule)
# jump to DROP_ALL policy
chain_name = iptables_manager.get_chain_name(FWAAS_DEFAULT_CHAIN)
for router_fw_port in router_fw_ports:
intf_name = self._get_intf_name(if_prefix,
router_fw_port)
jump_rule = ['-o %s -j %s-%s' % (intf_name, bname, chain_name)]
jump_rule = ['-o {} -j {}-{}'.format(intf_name, bname, chain_name)]
self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule)
self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)
@ -455,7 +456,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
for router_fw_port in router_fw_ports:
intf_name = self._get_intf_name(if_prefix,
router_fw_port)
jump_rule = ['-i %s -j %s-%s' % (intf_name, bname, chain_name)]
jump_rule = ['-i {} -j {}-{}'.format(intf_name, bname, chain_name)]
self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule)
self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)

View File

@ -16,7 +16,7 @@ import abc
import contextlib
class FirewallL2DriverBase(object, metaclass=abc.ABCMeta):
class FirewallL2DriverBase(metaclass=abc.ABCMeta):
"""Abstract firewall L2 driver base"""
def __init__(self, integration_bridge, sg_enabled=False):

View File

@ -51,7 +51,7 @@ def _replace_register(flow_params, register_number, register_value):
try:
reg_port = flow_params[register_value]
del flow_params[register_value]
flow_params['reg{:d}'.format(register_number)] = reg_port
flow_params[f'reg{register_number:d}'] = reg_port
except KeyError:
pass
@ -64,7 +64,7 @@ def create_reg_numbers(flow_params):
_replace_register(flow_params, fwaas_ovs_consts.REG_NET, 'reg_net')
class FirewallGroup(object):
class FirewallGroup:
def __init__(self, id_):
self.id = id_
self.ingress_rules = []
@ -105,7 +105,7 @@ class FirewallGroup(object):
# NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver that
# differs only in firewall groups list field name
class OFPort(object):
class OFPort:
def __init__(self, port_dict, ovs_port, vlan_tag):
self.id = port_dict['device']
self.vlan_tag = vlan_tag
@ -157,7 +157,7 @@ class OFPort(object):
# NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver that
# differs in methods name [s/sg/fwg] and update_rules method.
class FWGPortMap(object):
class FWGPortMap:
def __init__(self):
self.ports = {}
self.fw_groups = {}
@ -253,7 +253,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
dl_type = kwargs.get('dl_type')
create_reg_numbers(kwargs)
if isinstance(dl_type, int):
kwargs['dl_type'] = "0x{:04x}".format(dl_type)
kwargs['dl_type'] = f"0x{dl_type:04x}"
if self._deferred:
self.int_br.add_flow(**kwargs)
else:
@ -667,7 +667,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
nw_proto=lib_const.PROTO_NUM_UDP,
tp_src=src_port,
tp_dst=dst_port,
actions='resubmit(,{:d})'.format(accept_or_ingress)
actions=f'resubmit(,{accept_or_ingress:d})'
)
# Ban dhcp service running on an instance
for dl_type, src_port, dst_port in (
@ -724,7 +724,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
fwaas_ovs_consts.FW_BASE_INGRESS_TABLE),
)
for ethertype in [lib_const.ETHERTYPE_IP,
lib_const.ETHERTYPE_IPV6]:
lib_const.ETHERTYPE_IPV6]:
self._add_flow(
table=fwaas_ovs_consts.FW_ACCEPT_OR_INGRESS_TABLE,
priority=90,
@ -807,7 +807,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
dl_type=lib_const.ETHERTYPE_IPV6,
nw_proto=lib_const.PROTO_NUM_IPV6_ICMP,
icmp_type=icmp_type,
actions='output:{:d}'.format(port.ofport)
actions=f'output:{port.ofport:d}'
)
# NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver
@ -819,7 +819,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
priority=100,
dl_type=lib_const.ETHERTYPE_ARP,
reg_port=port.ofport,
actions='output:{:d}'.format(port.ofport)
actions=f'output:{port.ofport:d}'
)
self._initialize_ingress_ipv6_icmp(port)
@ -835,7 +835,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
nw_proto=lib_const.PROTO_NUM_UDP,
tp_src=src_port,
tp_dst=dst_port,
actions='output:{:d}'.format(port.ofport)
actions=f'output:{port.ofport:d}'
)
# Track untracked
@ -888,7 +888,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase):
ct_state=state,
ct_mark=fwaas_ovs_consts.CT_MARK_NORMAL,
ct_zone=port.vlan_tag,
actions='output:{:d}'.format(port.ofport)
actions=f'output:{port.ofport:d}'
)
self._add_flow(
table=fwaas_ovs_consts.FW_RULES_INGRESS_TABLE,

View File

@ -87,7 +87,7 @@ def populate_flow_common(direction, flow_template, port):
"""Initialize common flow fields."""
if direction == n_consts.INGRESS_DIRECTION:
flow_template['table'] = fwaas_ovs_consts.FW_RULES_INGRESS_TABLE
flow_template['actions'] = "output:{:d}".format(port.ofport)
flow_template['actions'] = f"output:{port.ofport:d}"
elif direction == n_consts.EGRESS_DIRECTION:
flow_template['table'] = fwaas_ovs_consts.FW_RULES_EGRESS_TABLE
# Traffic can be both ingress and egress, check that no ingress rules
@ -121,10 +121,10 @@ def create_port_range_flows(flow_template, rule):
if protocol is None:
return []
flows = []
src_port_match = '{:s}_src'.format(protocol)
src_port_match = f'{protocol:s}_src'
src_port_min = rule.get('source_port_range_min')
src_port_max = rule.get('source_port_range_max')
dst_port_match = '{:s}_dst'.format(protocol)
dst_port_match = f'{protocol:s}_dst'
dst_port_min = rule.get('port_range_min')
dst_port_max = rule.get('port_range_max')

View File

@ -50,12 +50,12 @@ FWaaSOpts = [
cfg.CONF.register_opts(FWaaSOpts, 'fwaas')
class FWaaSPluginApiMixin(object):
class FWaaSPluginApiMixin:
"""Agent side of the FWaaS agent to FWaaS Plugin RPC API."""
def __init__(self, topic, host):
# NOTE(annp): Mixin class should call super
super(FWaaSPluginApiMixin, self).__init__()
super().__init__()
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
@ -74,12 +74,12 @@ class FWaaSPluginApiMixin(object):
firewall_id=firewall_id)
class FWaaSAgentRpcCallbackMixin(object):
class FWaaSAgentRpcCallbackMixin:
"""Mixin for FWaaS agent Implementations."""
def __init__(self, host):
super(FWaaSAgentRpcCallbackMixin, self).__init__(host)
super().__init__(host)
def create_firewall(self, context, firewall, host):
"""Handle RPC cast from plugin to create a firewall."""

View File

@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__)
FIREWALL_DRIVERS = 'firewall_drivers'
class FirewallService(object):
class FirewallService:
"""Firewall Service observer."""
def load_device_drivers(self):

View File

@ -409,7 +409,7 @@ class FWaaSV2AgentExtension(l2_extension.L2AgentExtension):
self._send_fwg_status(context, fwg['id'], status, self.conf.host)
class PortFirewallGroupMap(object):
class PortFirewallGroupMap:
"""Store relations between Port and Firewall Group and trusted port
This map is used in deleting firewall_group because the firewall_group has

View File

@ -37,14 +37,14 @@ LOG = logging.getLogger(__name__)
class FWaaSL3PluginApi(api.FWaaSPluginApiMixin):
"""Agent side of the FWaaS agent-to-plugin RPC API."""
def __init__(self, topic, host):
super(FWaaSL3PluginApi, self).__init__(topic, host)
super().__init__(topic, host)
def get_firewall_groups_for_project(self, context, **kwargs):
"""Fetches a project's firewall groups from the plugin."""
LOG.debug("Fetch firewall groups from plugin")
cctxt = self.client.prepare()
return cctxt.call(context, 'get_firewall_groups_for_project',
host=self.host)
host=self.host)
def get_projects_with_firewall_groups(self, context, **kwargs):
"""Fetches from the plugin all projects that have firewall groups
@ -61,14 +61,14 @@ class FWaaSL3PluginApi(api.FWaaSPluginApiMixin):
LOG.debug("Notify plugin that firewall group has been deleted")
cctxt = self.client.prepare()
return cctxt.call(context, 'firewall_group_deleted', fwg_id=fwg_id,
host=self.host)
host=self.host)
def set_firewall_group_status(self, context, fwg_id, status, **kwargs):
"""Sets firewall group's status on the plugin."""
LOG.debug("Set firewall groups from plugin")
cctxt = self.client.prepare()
return cctxt.call(context, 'set_firewall_group_status',
fwg_id=fwg_id, status=status, host=self.host)
fwg_id=fwg_id, status=status, host=self.host)
class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
@ -125,7 +125,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
self.services_sync_needed = False
self.fwplugin_rpc = FWaaSL3PluginApi(fwaas_constants.FIREWALL_PLUGIN,
host)
super(FWaaSL3AgentExtension, self).__init__()
super().__init__()
@property
def _local_namespaces(self):
@ -141,7 +141,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
return 'add-port-ids' in firewall_group
def _get_firewall_group_ports(self, context, firewall_group,
to_delete=False, require_new_plugin=False):
to_delete=False, require_new_plugin=False):
"""Returns in-namespace ports, either from firewall group dict if ports
update or from project routers otherwise if only policies update.
@ -160,7 +160,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
for router in routers:
if router.router['tenant_id'] == firewall_group['tenant_id']:
fwg_port_ids.extend([p['id'] for p in
router.internal_ports])
router.internal_ports])
# Return in-namespace port objects.
return self._get_in_ns_ports(fwg_port_ids)
@ -260,9 +260,9 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
return
# Apply a firewall group, as requested, to ports on the new router.
all_router_ports = set(
all_router_ports = {
p['id'] for p in updated_router[nl_constants.INTERFACE_KEY]
)
}
processed_ports = set()
for firewall_group in fwg_list:
if not self._has_port_insertion_fields(firewall_group):
@ -368,8 +368,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
return
LOG.debug("Create firewall group %(fwg_id)s on ports: %(ports)s",
{'fwg_id': firewall_group['id'],
'ports': ', '.join([p for ri_ports in ports_for_fwg
{'fwg_id': firewall_group['id'],
'ports': ', '.join([p for ri_ports in ports_for_fwg
for p in ri_ports[1]])})
# Set firewall group status; will be overwritten if call to driver
@ -392,8 +392,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
# Send firewall group's status to plugin.
try:
self.fwplugin_rpc.set_firewall_group_status(context,
firewall_group['id'], status)
self.fwplugin_rpc.set_firewall_group_status(
context, firewall_group['id'], status)
except Exception:
msg = ("FWaaS RPC failure in create_firewall_group "
"for firewall group: %(fwg_id)s")
@ -485,8 +485,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
# Return status to plugin.
try:
self.fwplugin_rpc.set_firewall_group_status(context,
firewall_group['id'], status)
self.fwplugin_rpc.set_firewall_group_status(
context, firewall_group['id'], status)
except Exception:
LOG.exception("FWaaS RPC failure in update_firewall_group "
"for firewall group: %s", firewall_group['id'])
@ -528,11 +528,11 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension):
# plugin, as appropriate.
try:
if status in [nl_constants.ACTIVE, nl_constants.DOWN]:
self.fwplugin_rpc.firewall_group_deleted(context,
firewall_group['id'])
self.fwplugin_rpc.firewall_group_deleted(
context, firewall_group['id'])
else:
self.fwplugin_rpc.set_firewall_group_status(context,
firewall_group['id'], status)
self.fwplugin_rpc.set_firewall_group_status(
context, firewall_group['id'], status)
except Exception:
LOG.exception("FWaaS RPC failure in delete_firewall_group "
"for firewall group: %s", firewall_group['id'])
@ -549,4 +549,4 @@ class L3WithFWaaS(FWaaSL3AgentExtension):
self.conf = conf
else:
self.conf = cfg.CONF
super(L3WithFWaaS, self).__init__(host=self.conf.host, conf=self.conf)
super().__init__(host=self.conf.host, conf=self.conf)

View File

@ -31,7 +31,7 @@ from neutron_fwaas.db.firewall.v2 import firewall_db_v2
LOG = logging.getLogger(__name__)
class FirewallDriver(object, metaclass=abc.ABCMeta):
class FirewallDriver(metaclass=abc.ABCMeta):
"""Firewall v2 interface for driver
That driver interface does not persist Firewall v2 data in any database.
@ -131,7 +131,7 @@ class FirewallDriverDBMixin(FirewallDriver, metaclass=abc.ABCMeta):
"""
def __init__(self, *args, **kwargs):
super(FirewallDriverDBMixin, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.firewall_db = firewall_db_v2.FirewallPluginDb()
@staticmethod
@ -517,7 +517,7 @@ class FirewallDriverDB(FirewallDriverDBMixin):
pass
class FirewallDriverRPCMixin(object, metaclass=abc.ABCMeta):
class FirewallDriverRPCMixin(metaclass=abc.ABCMeta):
"""FirewallAgent interface for driver with rpc callback listener.
Each firewall backend driver that needs a rpc callback listener should

View File

@ -27,7 +27,7 @@ def acl_direction(direction, port_group=None):
portdir = 'inport'
else:
portdir = 'outport'
return '%s == @%s' % (portdir, port_group)
return '{} == @{}'.format(portdir, port_group)
def acl_ethertype(rule):
@ -48,8 +48,8 @@ def acl_ethertype(rule):
def acl_ip(rule, ip_version):
src_ip = rule.get('source_ip_address')
dst_ip = rule.get('destination_ip_address')
src = ' && %s.src == %s' % (ip_version, src_ip) if src_ip else ''
dst = ' && %s.dst == %s' % (ip_version, dst_ip) if dst_ip else ''
src = ' && {}.src == {}'.format(ip_version, src_ip) if src_ip else ''
dst = ' && {}.dst == {}'.format(ip_version, dst_ip) if dst_ip else ''
return src + dst

View File

@ -36,7 +36,7 @@ class OVNFwaasDriver(driver_api.FirewallDriverDB):
"""
def __init__(self, service_plugin):
super(OVNFwaasDriver, self).__init__(service_plugin)
super().__init__(service_plugin)
self._mech = None
def is_supported_l2_port(self, port):

View File

@ -64,7 +64,7 @@ def setup_logging():
LOG.logger.addHandler(syslog_handler)
class LogPrefix(object):
class LogPrefix:
"""LogPrefix could be used as prefix in NFLOG rules
Each of a couple (port_id, event) has its own LogPrefix object
"""
@ -99,7 +99,7 @@ class LogPrefix(object):
return not self.log_object_refs
class FWGPortLog(object):
class FWGPortLog:
"""A firewall group port log per log_object"""
def __init__(self, port_id, log_info):
@ -234,7 +234,7 @@ class IptablesLoggingDriver(log_ext.LoggingDriver):
# Start libnetfilter_log after router starting up
pid = libnflog.run_nflog(router_info.ns_name)
LOG.debug("NFLOG process ID %s for router %s has started",
pid, router_info.router_id)
pid, router_info.router_id)
self.nflog_proc_map[router_id] = pid
else:
# Handle the log request
@ -502,7 +502,7 @@ class IptablesLoggingDriver(log_ext.LoggingDriver):
def _generate_iptables_args(self, direction, device, prefix=None):
direction_config = ['-%s %s' %
(IPTABLES_DIRECTION_DEVICE[direction], device)]
(IPTABLES_DIRECTION_DEVICE[direction], device)]
match_rule = []
if self.rate_limit:
match_rule += [

View File

@ -23,7 +23,7 @@ FIREWALL_LOG_DRIVER_NAME = 'fwaas_v2_log'
class FWaaSL3LoggingExtension(base.L3LoggingExtensionBase,
l3_extension.L3AgentExtension):
l3_extension.L3AgentExtension):
def initialize(self, connection, driver_type):
"""Initialize L3 logging agent extension"""

View File

@ -34,7 +34,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
BUILD_WITH_MIGRATIONS = True
def setUp(self, environment):
super(BaseFullStackTestCase, self).setUp()
super().setUp()
tests_base.setup_test_logging(
cfg.CONF, DEFAULT_LOG_DIR, '%s.txt' % self.get_name())
@ -66,4 +66,4 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
def get_name(self):
class_name, test_name = self.id().split(".")[-2:]
return "%s.%s" % (class_name, test_name)
return "{}.{}".format(class_name, test_name)

View File

@ -38,7 +38,7 @@ class ClientFixture(fixtures.Fixture):
"""Manage and cleanup neutron resources."""
def __init__(self, client):
super(ClientFixture, self).__init__()
super().__init__()
self.client = client
def _create_resource(self, resource_type, spec):

View File

@ -33,7 +33,7 @@ class ConfigFixture(fixtures.Fixture):
is initializing a new instance of the class.
"""
def __init__(self, env_desc, host_desc, temp_dir, base_filename):
super(ConfigFixture, self).__init__()
super().__init__()
self.config = config_fixtures.ConfigDict()
self.env_desc = env_desc
self.host_desc = host_desc
@ -51,7 +51,7 @@ class NeutronConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir,
connection, rabbitmq_environment):
super(NeutronConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='neutron.conf')
service_plugins = ['router', 'trunk']
@ -90,7 +90,7 @@ class NeutronConfigFixture(ConfigFixture):
'bind_port': self.useFixture(
port.ExclusivePort(constants.PROTO_NAME_TCP)).port
})
super(NeutronConfigFixture, self)._setUp()
super()._setUp()
def _generate_host(self):
return utils.get_rand_name(prefix='host-')
@ -110,7 +110,7 @@ class NeutronConfigFixture(ConfigFixture):
class ML2ConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, tenant_network_types):
super(ML2ConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='ml2_conf.ini')
mechanism_drivers = self.env_desc.mech_drivers
@ -141,7 +141,7 @@ class ML2ConfigFixture(ConfigFixture):
class OVSConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, local_ip):
super(OVSConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir,
base_filename='openvswitch_agent.ini')
@ -182,7 +182,7 @@ class OVSConfigFixture(ConfigFixture):
'of_listen_port': self.useFixture(
port.ExclusivePort(constants.PROTO_NAME_TCP)).port
})
super(OVSConfigFixture, self)._setUp()
super()._setUp()
def _generate_bridge_mappings(self):
return 'physnet1:%s' % utils.get_rand_device_name(prefix='br-eth')
@ -213,7 +213,7 @@ class LinuxBridgeConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, local_ip,
physical_device_name):
super(LinuxBridgeConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir,
base_filename="linuxbridge_agent.ini"
)
@ -255,7 +255,7 @@ class LinuxBridgeConfigFixture(ConfigFixture):
class L3ConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, integration_bridge=None):
super(L3ConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='l3_agent.ini')
if host_desc.l2_agent_type == constants.AGENT_TYPE_OVS:
self._prepare_config_with_ovs_agent(integration_bridge)

View File

@ -28,7 +28,7 @@ from neutron.tests.fullstack.resources import config
from neutron.tests.fullstack.resources import process
class EnvironmentDescription(object):
class EnvironmentDescription:
"""A set of characteristics of an environment setup.
Does the setup, as a whole, support tunneling? How about l2pop?
@ -47,7 +47,7 @@ class EnvironmentDescription(object):
return self.network_type in ('vxlan', 'gre')
class HostDescription(object):
class HostDescription:
"""A set of characteristics of an environment Host.
What agents should the host spawn? What mode should each agent operate
@ -275,7 +275,7 @@ class Environment(fixtures.Fixture):
:param hosts_desc: A list of HostDescription instances.
"""
super(Environment, self).__init__()
super().__init__()
self.env_desc = env_desc
self.hosts_desc = hosts_desc
self.hosts = []

View File

@ -44,7 +44,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
def __init__(self, host, network_id, tenant_id, safe_client,
neutron_port=None, bridge_name=None):
super(FakeFullstackMachine, self).__init__()
super().__init__()
self.host = host
self.tenant_id = tenant_id
self.network_id = network_id
@ -53,7 +53,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
self.bridge_name = bridge_name
def _setUp(self):
super(FakeFullstackMachine, self)._setUp()
super()._setUp()
self.bridge = self._get_bridge()
@ -81,8 +81,8 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
self.neutron_port['id'],
{'port': {pbs.HOST_ID: self.host.hostname}})
self.addCleanup(self.safe_client.client.update_port,
self.neutron_port['id'],
{'port': {pbs.HOST_ID: ''}})
self.neutron_port['id'],
{'port': {pbs.HOST_ID: ''}})
def _get_bridge(self):
if self.bridge_name is None:
@ -107,7 +107,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
subnet_id = fixed_ip['subnet_id']
subnet = self.safe_client.client.show_subnet(subnet_id)
prefixlen = netaddr.IPNetwork(subnet['subnet']['cidr']).prefixlen
self._ip_cidr = '%s/%s' % (self._ip, prefixlen)
self._ip_cidr = '{}/{}'.format(self._ip, prefixlen)
# TODO(amuller): Support DHCP
self.port.addr.add(self.ip_cidr)
@ -152,7 +152,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
class FakeFullstackTrunkMachine(FakeFullstackMachine):
def __init__(self, trunk, *args, **kwargs):
super(FakeFullstackTrunkMachine, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.trunk = trunk
def add_vlan_interface(self, mac_address, ip_address, segmentation_id):

View File

@ -32,7 +32,7 @@ from neutron.tests.fullstack import base as fullstack_base
class ProcessFixture(fixtures.Fixture):
def __init__(self, test_name, process_name, exec_name, config_filenames,
namespace=None, kill_signal=signal.SIGKILL):
super(ProcessFixture, self).__init__()
super().__init__()
self.test_name = test_name
self.process_name = process_name
self.exec_name = exec_name
@ -52,7 +52,7 @@ class ProcessFixture(fixtures.Fixture):
common_utils.ensure_dir(log_dir)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S-%f")
log_file = "%s--%s.log" % (self.process_name, timestamp)
log_file = "{}--{}.log".format(self.process_name, timestamp)
cmd = [shutil.which(self.exec_name),
'--log-dir', log_dir,
'--log-file', log_file]
@ -71,7 +71,7 @@ class ProcessFixture(fixtures.Fixture):
class RabbitmqEnvironmentFixture(fixtures.Fixture):
def __init__(self, host="127.0.0.1"):
super(RabbitmqEnvironmentFixture, self).__init__()
super().__init__()
self.host = host
def _setUp(self):
@ -100,7 +100,7 @@ class NeutronServerFixture(fixtures.Fixture):
def __init__(self, env_desc, host_desc,
test_name, neutron_cfg_fixture, plugin_cfg_fixture):
super(NeutronServerFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -140,7 +140,7 @@ class OVSAgentFixture(fixtures.Fixture):
def __init__(self, env_desc, host_desc,
test_name, neutron_cfg_fixture, agent_cfg_fixture):
super(OVSAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -174,7 +174,7 @@ class LinuxBridgeAgentFixture(fixtures.Fixture):
def __init__(self, env_desc, host_desc, test_name,
neutron_cfg_fixture, agent_cfg_fixture,
namespace=None):
super(LinuxBridgeAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -206,7 +206,7 @@ class L3AgentFixture(fixtures.Fixture):
def __init__(self, env_desc, host_desc, test_name,
neutron_cfg_fixture, l3_agent_cfg_fixture,
namespace=None):
super(L3AgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name

View File

@ -57,7 +57,7 @@ class TestL3Agent(base.BaseFullStackTestCase):
for cidr in subnet_cidrs:
# For IPv6 subnets, enable_dhcp should be set to true.
enable_dhcp = (netaddr.IPNetwork(cidr).version ==
constants.IP_VERSION_6)
constants.IP_VERSION_6)
subnet = self.safe_client.create_subnet(
tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp)
@ -83,7 +83,7 @@ class TestLegacyL3Agent(TestL3Agent):
environment.EnvironmentDescription(
network_type='vlan', l2_pop=False),
host_descriptions)
super(TestLegacyL3Agent, self).setUp(env)
super().setUp(env)
def _get_namespace(self, router_id):
return namespaces.build_ns_name(l3_agent.NS_PREFIX, router_id)
@ -101,9 +101,9 @@ class TestLegacyL3Agent(TestL3Agent):
tenant_id, network['id'], '20.0.0.0/24', gateway_ip='20.0.0.1')
self.safe_client.add_router_interface(router['id'], subnet['id'])
namespace = "%s@%s" % (
namespace = "{}@{}".format(
self._get_namespace(router['id']),
self.environment.hosts[0].l3_agent.get_namespace_suffix(), )
self.environment.hosts[0].l3_agent.get_namespace_suffix())
self._assert_namespace_exists(namespace)
def test_east_west_traffic(self):
@ -159,7 +159,7 @@ class TestHAL3Agent(base.BaseFullStackTestCase):
environment.EnvironmentDescription(
network_type='vxlan', l2_pop=True),
host_descriptions)
super(TestHAL3Agent, self).setUp(env)
super().setUp(env)
def _is_ha_router_active_on_one_agent(self, router_id):
agents = self.client.list_l3_agent_hosting_routers(router_id)

View File

@ -61,7 +61,7 @@ class TestSanityCheck(testlib_api.SqlTestCaseLight):
BUILD_SCHEMA = False
def setUp(self):
super(TestSanityCheck, self).setUp()
super().setUp()
for conf in migration.get_alembic_configs():
self.alembic_config = conf

View File

@ -148,7 +148,7 @@ class NetlinkLibTestCaseIPv6(functional_base.BaseSudoTestCase):
)
def setUp(self):
super(NetlinkLibTestCaseIPv6, self).setUp()
super().setUp()
if not check_nf_conntrack_ipv6_is_loaded():
self.skipTest(
"nf_conntrack_ipv6 module wasn't loaded. Please load"

View File

@ -24,7 +24,7 @@ from neutron_fwaas.privileged.tests.functional import utils
class InNamespaceTest(base.BaseSudoTestCase):
def setUp(self):
super(InNamespaceTest, self).setUp()
super().setUp()
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
ip = ip_lib.IPWrapper()

View File

@ -63,7 +63,7 @@ AGENT_MODE_OPTS = [
class FWLoggingTestBase(framework.L3AgentTestFramework):
def setUp(self):
super(FWLoggingTestBase, self).setUp()
super().setUp()
self.conf.register_opts(FWAAS_V2_LOG_OPTS, 'fwaas')
self.conf.register_opts(AGENT_MODE_OPTS, group='DEFAULT')
self._set_agent_mode(self.conf)
@ -123,7 +123,8 @@ class FWLoggingTestBase(framework.L3AgentTestFramework):
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(chain)
if action == 'REJECT':
ipt_mgr.ipv4['filter'].add_rule(chain,
ipt_mgr.ipv4['filter'].add_rule(
chain,
'-j REJECT --reject-with icmp-port-unreachable')
else:
ipt_mgr.ipv4['filter'].add_rule(chain, '-j %s' % action)
@ -133,7 +134,8 @@ class FWLoggingTestBase(framework.L3AgentTestFramework):
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(chain)
if action == 'REJECT':
ipt_mgr.ipv6['filter'].add_rule(chain,
ipt_mgr.ipv6['filter'].add_rule(
chain,
'-j REJECT --reject-with icmp6-port-unreachable')
else:
ipt_mgr.ipv6['filter'].add_rule(chain, '-j %s' % action)

View File

@ -24,7 +24,7 @@ from neutron_fwaas.tests import base
class TestChecks(base.BaseTestCase):
def setUp(self):
super(TestChecks, self).setUp()
super().setUp()
self.checks = checks.Checks()
def test_get_checks_list(self):

View File

@ -30,7 +30,7 @@ from neutron_fwaas.tests.unit.services.firewall import test_fwaas_plugin_v2
class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
def setUp(self):
super(TestFirewallDBPluginV2, self).setUp()
super().setUp()
self.db = self.plugin.driver.firewall_db
def test_get_policy_ordered_rules(self):
@ -877,8 +877,8 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
# make sure that admin can see default groups for admin and non-admin
res = self._list_req('firewall_groups', ctx=ctx_admin, as_admin=True)
self.assertEqual(2, len(res))
self.assertEqual(set([ctx_admin.tenant_id, ctx.tenant_id]),
set([r['tenant_id'] for r in res]))
self.assertEqual({ctx_admin.tenant_id, ctx.tenant_id},
{r['tenant_id'] for r in res})
def test_create_default_firewall_group_from_config(self):
group = 'default_fwg_rules'
@ -1026,10 +1026,10 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
description = "my_firewall1"
not_found_fwp_id = uuidutils.generate_uuid()
self._create_firewall_group(fmt, fwg_name,
description, not_found_fwp_id,
not_found_fwp_id, ports=None,
admin_state_up=self.ADMIN_STATE_UP,
expected_res_status=404)
description, not_found_fwp_id,
not_found_fwp_id, ports=None,
admin_state_up=self.ADMIN_STATE_UP,
expected_res_status=404)
def test_create_firewall_group_with_fwp_on_different_tenant(self):
fmt = self.fmt
@ -1041,9 +1041,9 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
ctx = self._get_nonadmin_context()
self._create_firewall_group(fmt, fwg_name,
description,
ingress_firewall_policy_id=fwp_id,
ingress_firewall_policy_id=fwp_id,
egress_firewall_policy_id=fwp_id,
context=ctx,
context=ctx,
expected_res_status=404)
def test_create_firewall_group_with_admin_and_fwp_different_tenant(self):
@ -1685,8 +1685,8 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase):
as_admin=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
fwr_id = fwr['firewall_rule']['id']
msg = "Firewall rule {0} is not associated with " \
"firewall policy {1}.".format(fwr_id, fwp_id)
msg = "Firewall rule {} is not associated with " \
"firewall policy {}.".format(fwr_id, fwp_id)
result = self._rule_action(
'remove', fwp_id, fwr_id,
insert_before=None,

View File

@ -35,7 +35,7 @@ class NFLogAppTestCase(base.BaseTestCase):
self.nflog_app = lib_log.NFLogApp()
self.spawn = mock.patch('eventlet.spawn').start()
super(NFLogAppTestCase, self).setUp()
super().setUp()
def test_register_packet_handler(self):
def fake_method():
@ -55,7 +55,7 @@ class NFLogAppTestCase(base.BaseTestCase):
class NFLogWrapper(base.BaseTestCase):
def setUp(self):
super(NFLogWrapper, self).setUp()
super().setUp()
lib_log.libnflog = mock.Mock()
lib_log.ffi = mock.Mock()

View File

@ -40,7 +40,7 @@ FAKE_ICMPV6_ENTRY = {'ipversion': 6, 'protocol': 'ipv6-icmp',
class NetlinkLibTestCase(base.BaseTestCase):
def setUp(self):
super(NetlinkLibTestCase, self).setUp()
super().setUp()
nl_lib.nfct = mock.Mock()
nl_lib.libc = mock.Mock()
@ -250,8 +250,8 @@ class NetlinkLibTestCase(base.BaseTestCase):
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_ENTRY,
FAKE_TCP_ENTRY,
FAKE_UDP_ENTRY])
FAKE_TCP_ENTRY,
FAKE_UDP_ENTRY])
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,

View File

@ -27,7 +27,7 @@ class InNamespaceTest(base.BaseTestCase):
NEW_NETNS = 'newns'
def setUp(self):
super(InNamespaceTest, self).setUp()
super().setUp()
# NOTE(cby): we should unmock os.open/close as early as possible
# because there are used in cleanups

View File

@ -23,7 +23,7 @@ from neutron_fwaas.tests import base
class TestNoopDriver(base.BaseTestCase):
def setUp(self):
super(TestNoopDriver, self).setUp()
super().setUp()
mock_br = mock.Mock()
self.firewall = noop_driver.NoopFirewallL2Driver(mock_br)

View File

@ -48,15 +48,15 @@ class TestCreateRegNumbers(base.BaseTestCase):
def test_both_registers_defined(self):
flow = {'foo': 'bar', 'reg_port': 1, 'reg_net': 2}
expected_flow = {'foo': 'bar',
'reg{:d}'.format(fwaas_ovs_consts.REG_PORT): 1,
'reg{:d}'.format(fwaas_ovs_consts.REG_NET): 2}
f'reg{fwaas_ovs_consts.REG_PORT:d}': 1,
f'reg{fwaas_ovs_consts.REG_NET:d}': 2}
ovsfw.create_reg_numbers(flow)
self.assertEqual(expected_flow, flow)
class TestFirewallGroup(base.BaseTestCase):
def setUp(self):
super(TestFirewallGroup, self).setUp()
super().setUp()
self.fwg = ovsfw.FirewallGroup('123')
self.fwg.members = {'type': [1, 2, 3, 4]}
@ -103,7 +103,7 @@ class TestFirewallGroup(base.BaseTestCase):
class TestOFPort(base.BaseTestCase):
def setUp(self):
super(TestOFPort, self).setUp()
super().setUp()
self.ipv4_addresses = ['10.0.0.1', '192.168.0.1']
self.ipv6_addresses = ['fe80::f816:3eff:fe2e:1']
port_dict = {'device': 1,
@ -165,7 +165,7 @@ class TestOFPort(base.BaseTestCase):
class TestFWGPortMap(base.BaseTestCase):
def setUp(self):
super(TestFWGPortMap, self).setUp()
super().setUp()
self.map = ovsfw.FWGPortMap()
def test_get_or_create_fwg_existing_fwg(self):
@ -250,7 +250,7 @@ class TestFWGPortMap(base.BaseTestCase):
self.map.update_members(42, [])
class FakeOVSPort(object):
class FakeOVSPort:
def __init__(self, name, port, mac):
self.port_name = name
self.ofport = port
@ -259,7 +259,7 @@ class FakeOVSPort(object):
class TestOVSFirewallDriver(base.BaseTestCase):
def setUp(self):
super(TestOVSFirewallDriver, self).setUp()
super().setUp()
self._mock_ovs_br = mock.patch.object(
ovs_lib, 'OVSBridge', autospec=True)
mock_bridge = self._mock_ovs_br.start()
@ -319,8 +319,8 @@ class TestOVSFirewallDriver(base.BaseTestCase):
def test__add_flow_registers_are_replaced(self):
self.firewall._add_flow(in_port=1, reg_port=1, reg_net=2)
expected_calls = {'in_port': 1,
'reg{:d}'.format(fwaas_ovs_consts.REG_PORT): 1,
'reg{:d}'.format(fwaas_ovs_consts.REG_NET): 2}
f'reg{fwaas_ovs_consts.REG_PORT:d}': 1,
f'reg{fwaas_ovs_consts.REG_NET:d}': 2}
self.mock_bridge.br.add_flow.assert_called_once_with(
**expected_calls)
@ -444,7 +444,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
'output:{:d},resubmit(,{:d})'.format(
self.port_ofport,
ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
dl_type=f"0x{constants.ETHERTYPE_IP:04x}",
nw_proto=constants.PROTO_NUM_TCP,
priority=70,
reg5=self.port_ofport,
@ -482,8 +482,8 @@ class TestOVSFirewallDriver(base.BaseTestCase):
priority=95,
table=ovs_consts.TRANSIENT_TABLE)
filter_rule = mock.call(
actions='resubmit(,{:d})'.format(ovs_consts.RULES_INGRESS_TABLE),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
actions=f'resubmit(,{ovs_consts.RULES_INGRESS_TABLE:d})',
dl_type=f"0x{constants.ETHERTYPE_IP:04x}",
nw_proto=constants.PROTO_NUM_TCP,
priority=70,
reg5=self.port_ofport,
@ -529,7 +529,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
mock.call(
actions='resubmit(,{:d})'.format(
fwaas_ovs_consts.FW_ACCEPT_OR_INGRESS_TABLE),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
dl_type=f"0x{constants.ETHERTYPE_IP:04x}",
nw_proto=constants.PROTO_NUM_UDP,
priority=71,
ct_state=fwaas_ovs_consts.OF_STATE_NEW_NOT_ESTABLISHED,
@ -567,7 +567,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
mock.call(
actions='resubmit(,{:d})'.format(
ovs_consts.RULES_EGRESS_TABLE),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
dl_type=f"0x{constants.ETHERTYPE_IP:04x}",
nw_proto=constants.PROTO_NUM_UDP,
priority=71,
ct_state=fwaas_ovs_consts.OF_STATE_NEW_NOT_ESTABLISHED,
@ -638,7 +638,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
self._prepare_firewall_group()
self.firewall.fwg_to_delete = {1}
with mock.patch.object(self.firewall.fwg_port_map,
'delete_fwg') as delete_fwg_mock:
'delete_fwg') as delete_fwg_mock:
self.firewall._cleanup_stale_fwg()
delete_fwg_mock.assert_called_once_with(1)

View File

@ -52,7 +52,7 @@ class TestIsValidPrefix(base.BaseTestCase):
class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
def setUp(self):
super(TestCreateFlowsFromRuleAndPort, self).setUp()
super().setUp()
ovs_port = mock.Mock(vif_mac='00:00:00:00:00:00')
ovs_port.ofport = 1
port_dict = {'device': 'port_id'}
@ -169,7 +169,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
class TestCreateProtocolFlows(base.BaseTestCase):
def setUp(self):
super(TestCreateProtocolFlows, self).setUp()
super().setUp()
ovs_port = mock.Mock(vif_mac='00:00:00:00:00:00')
ovs_port.ofport = 1
port_dict = {'device': 'port_id'}

View File

@ -38,7 +38,7 @@ MAX_INTF_NAME_LEN = 14
class IptablesFwaasTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesFwaasTestCase, self).setUp()
super().setUp()
self.iptables_cls_p = mock.patch(
'neutron.agent.linux.iptables_manager.IptablesManager')
self.iptables_cls_p.start()
@ -124,7 +124,7 @@ class IptablesFwaasTestCase(base.BaseTestCase):
return fw_inst
def _fake_apply_list(self, router_count=1, distributed=False,
distributed_mode=None):
distributed_mode=None):
apply_list = []
while router_count > 0:
iptables_inst = mock.Mock()
@ -149,12 +149,13 @@ class IptablesFwaasTestCase(base.BaseTestCase):
return apply_list
def _get_intf_name(self, if_prefix, port_id):
_name = "%s%s" % (if_prefix, port_id)
_name = "{}{}".format(if_prefix, port_id)
return _name[:MAX_INTF_NAME_LEN]
def _setup_firewall_with_rules(self, func, router_count=1,
distributed=False, distributed_mode=None):
apply_list = self._fake_apply_list(router_count=router_count,
distributed=False, distributed_mode=None):
apply_list = self._fake_apply_list(
router_count=router_count,
distributed=distributed, distributed_mode=distributed_mode)
rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall(rule_list)
@ -178,8 +179,8 @@ class IptablesFwaasTestCase(base.BaseTestCase):
rule3 = '-p tcp -m tcp --dport 23 -j %s' % rejected
ingress_chain = 'iv4%s' % firewall['id']
egress_chain = 'ov4%s' % firewall['id']
ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11])
ipt_mgr_ichain = '{}-{}'.format(binary_name, ingress_chain[:11])
ipt_mgr_echain = '{}-{}'.format(binary_name, egress_chain[:11])
for router_info_inst, port_ids in apply_list:
v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']
calls = [mock.call.remove_chain('iv4fake-fw-uuid'),
@ -204,24 +205,29 @@ class IptablesFwaasTestCase(base.BaseTestCase):
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-o %s -j %s' % (intf_name, ipt_mgr_ichain)))
calls.append(mock.call.add_rule(
'FORWARD',
'-o {} -j {}'.format(intf_name, ipt_mgr_ichain)))
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-i %s -j %s' % (intf_name, ipt_mgr_echain)))
calls.append(mock.call.add_rule(
'FORWARD',
'-i {} -j {}'.format(intf_name, ipt_mgr_echain)))
for direction in ['o', 'i']:
for port_id in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port_id)
calls.append(mock.call.add_rule('FORWARD',
'-%s %s -j %s-fwaas-defau' % (direction,
intf_name, binary_name)))
calls.append(mock.call.add_rule(
'FORWARD',
'-{} {} -j {}-fwaas-defau'.format(
direction, intf_name, binary_name)))
v4filter_inst.assert_has_calls(calls)
def _setup_firewall_with_rules_v6(self, func, router_count=1,
def _setup_firewall_with_rules_v6(
self, func, router_count=1,
distributed=False, distributed_mode=None):
apply_list = self._fake_apply_list(router_count=router_count,
apply_list = self._fake_apply_list(
router_count=router_count,
distributed=distributed, distributed_mode=distributed_mode)
rule_list = self._fake_rules_v6(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall(rule_list)
@ -242,8 +248,8 @@ class IptablesFwaasTestCase(base.BaseTestCase):
rule1 = '-p ipv6-icmp -d 2001:db8::2/128 -j %s' % accepted
ingress_chain = 'iv6%s' % firewall['id']
egress_chain = 'ov6%s' % firewall['id']
ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11])
ipt_mgr_ichain = '{}-{}'.format(binary_name, ingress_chain[:11])
ipt_mgr_echain = '{}-{}'.format(binary_name, egress_chain[:11])
for router_info_inst, port_ids in apply_list:
v6filter_inst = router_info_inst.iptables_manager.ipv6['filter']
calls = [mock.call.remove_chain('iv6fake-fw-uuid'),
@ -263,19 +269,23 @@ class IptablesFwaasTestCase(base.BaseTestCase):
]
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-o %s -j %s' % (intf_name, ipt_mgr_ichain)))
calls.append(mock.call.add_rule(
'FORWARD',
'-o {} -j {}'.format(intf_name, ipt_mgr_ichain)))
for port in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port)
calls.append(mock.call.add_rule('FORWARD',
'-i %s -j %s' % (intf_name, ipt_mgr_echain)))
calls.append(mock.call.add_rule(
'FORWARD',
'-i {} -j {}'.format(intf_name, ipt_mgr_echain)))
for direction in ['o', 'i']:
for port_id in FAKE_PORT_IDS:
intf_name = self._get_intf_name(if_prefix, port_id)
calls.append(mock.call.add_rule('FORWARD',
'-%s %s -j %s-fwaas-defau' % (direction,
intf_name, binary_name)))
calls.append(mock.call.add_rule(
'FORWARD',
'-{} {} -j {}-fwaas-defau'.format(direction,
intf_name,
binary_name)))
v6filter_inst.assert_has_calls(calls)
@ -289,8 +299,8 @@ class IptablesFwaasTestCase(base.BaseTestCase):
invalid_rule = '-m state --state INVALID -j %s' % dropped
est_rule = '-m state --state RELATED,ESTABLISHED -j ACCEPT'
for ip_version in (4, 6):
ingress_chain = ('iv%s%s' % (ip_version, firewall['id']))
egress_chain = ('ov%s%s' % (ip_version, firewall['id']))
ingress_chain = ('iv{}{}'.format(ip_version, firewall['id']))
egress_chain = ('ov{}{}'.format(ip_version, firewall['id']))
calls = [mock.call.remove_chain(
'iv%sfake-fw-uuid' % ip_version),
mock.call.remove_chain(
@ -308,10 +318,11 @@ class IptablesFwaasTestCase(base.BaseTestCase):
for port_id in FAKE_PORT_IDS:
for direction in ['o', 'i']:
mock.call.add_rule('FORWARD',
'-%s qr-%s -j %s-fwaas-defau' % (port_id,
direction,
binary_name))
mock.call.add_rule(
'FORWARD',
'-{} qr-{} -j {}-fwaas-defau'.format(port_id,
direction,
binary_name))
if ip_version == 4:
v4filter_inst = first_ri.iptables_manager.ipv4['filter']
v4filter_inst.assert_has_calls(calls)
@ -377,19 +388,23 @@ class IptablesFwaasTestCase(base.BaseTestCase):
first_ri.iptables_manager.ipv4['filter'].assert_has_calls(calls)
def test_create_firewall_group_with_rules_dvr_snat(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group,
self._setup_firewall_with_rules(
self.firewall.create_firewall_group,
distributed=True, distributed_mode='dvr_snat')
def test_update_firewall_group_with_rules_dvr_snat(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group,
self._setup_firewall_with_rules(
self.firewall.update_firewall_group,
distributed=True, distributed_mode='dvr_snat')
def test_create_firewall_group_with_rules_dvr(self):
self._setup_firewall_with_rules(self.firewall.create_firewall_group,
self._setup_firewall_with_rules(
self.firewall.create_firewall_group,
distributed=True, distributed_mode='dvr')
def test_update_firewall_group_with_rules_dvr(self):
self._setup_firewall_with_rules(self.firewall.update_firewall_group,
self._setup_firewall_with_rules(
self.firewall.update_firewall_group,
distributed=True, distributed_mode='dvr')
def test_remove_conntrack_new_firewall(self):
@ -408,10 +423,10 @@ class IptablesFwaasTestCase(base.BaseTestCase):
self.firewall.create_firewall_group(FW_LEGACY, apply_list, firewall)
self.firewall.pre_firewall = dict(firewall)
insert_rule = {'enabled': True,
'action': 'deny',
'ip_version': 4,
'protocol': 'icmp',
'id': 'fake-fw-rule'}
'action': 'deny',
'ip_version': 4,
'protocol': 'icmp',
'id': 'fake-fw-rule'}
rule_list.insert(2, insert_rule)
firewall = self._fake_firewall(rule_list)
self.firewall.update_firewall_group(FW_LEGACY, apply_list, firewall)
@ -492,10 +507,10 @@ class IptablesFwaasTestCase(base.BaseTestCase):
firewall = self._fake_firewall(rule_list)
self.firewall.create_firewall_group(FW_LEGACY, apply_list, firewall)
income_rule = {'enabled': True,
'action': 'deny',
'ip_version': 4,
'protocol': 'tcp',
'id': 'fake-fw-rule3'}
'action': 'deny',
'ip_version': 4,
'protocol': 'tcp',
'id': 'fake-fw-rule3'}
rule_list[2] = income_rule
firewall = self._fake_firewall(rule_list)
self.firewall.update_firewall_group(FW_LEGACY, apply_list, firewall)

View File

@ -96,7 +96,7 @@ ROUTER_NAMESPACE = 'qrouter-fake-namespace'
class ConntrackLegacyTestCase(base.BaseTestCase):
def setUp(self):
super(ConntrackLegacyTestCase, self).setUp()
super().setUp()
self.utils_exec = mock.Mock()
self.conntrack_driver = legacy_conntrack.ConntrackLegacy()
self.conntrack_driver.initialize(execute=self.utils_exec)
@ -135,7 +135,7 @@ class ConntrackLegacyTestCase(base.BaseTestCase):
self.conntrack_driver._execute_command = mock.Mock(
side_effect=get_contrack_entries)
entries = self.conntrack_driver.list_entries(ROUTER_NAMESPACE)
protocols = set([entry[1] for entry in entries])
protocols = {entry[1] for entry in entries}
supported_protocols = set(legacy_conntrack.ATTR_POSITIONS.keys())
self.assertTrue(protocols.issubset(supported_protocols))

View File

@ -69,7 +69,7 @@ ROUTER_NAMESPACE = 'qrouter-fake-namespace'
class ConntrackNetlinkTestCase(base.BaseTestCase):
def setUp(self):
super(ConntrackNetlinkTestCase, self).setUp()
super().setUp()
self.conntrack_driver = netlink_conntrack.ConntrackNetlink()
self.conntrack_driver.initialize()
nl_flush_entries = mock.patch('neutron_fwaas.privileged.'
@ -79,7 +79,7 @@ class ConntrackNetlinkTestCase(base.BaseTestCase):
'netlink_lib.list_entries')
self.list_entries = nl_list_entries.start()
nl_delete_entries = mock.patch('neutron_fwaas.privileged.'
'netlink_lib.delete_entries')
'netlink_lib.delete_entries')
self.delete_entries = nl_delete_entries.start()
def test_flush_entries(self):

View File

@ -34,10 +34,10 @@ PORT4 = uuidutils.generate_uuid()
HOST = 'fake_host'
class FakeFWaaSL2Agent(object):
class FakeFWaaSL2Agent:
def __init__(self):
super(FakeFWaaSL2Agent, self).__init__()
super().__init__()
def create(self, resource, attrs=None, minimal=False):
"""Create a fake fwaas v2 resources

View File

@ -31,7 +31,7 @@ from neutron_fwaas.tests.unit.services.firewall.service_drivers.agents.l2\
class TestFWaasV2AgentExtensionBase(base.BaseTestCase):
def setUp(self):
super(TestFWaasV2AgentExtensionBase, self).setUp()
super().setUp()
self.fake = fake_data.FakeFWaaSL2Agent()
self.port = self.fake.create('port')
@ -60,7 +60,7 @@ class TestFWaasV2AgentExtensionBase(base.BaseTestCase):
class TestFWaasV2AgentExtension(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestFWaasV2AgentExtension, self).setUp()
super().setUp()
cfg.CONF.set_override('firewall_l2_driver', 'ovs', group='fwaas')
def test_initialize(self):
@ -77,7 +77,7 @@ class TestFWaasV2AgentExtension(TestFWaasV2AgentExtensionBase):
class TestHandlePort(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestHandlePort, self).setUp()
super().setUp()
self.rpc.get_firewall_group_for_port = mock.Mock(
return_value=self.fwg)
self.l2._compute_status = mock.Mock(return_value=nl_consts.ACTIVE)
@ -154,7 +154,7 @@ class TestHandlePort(TestFWaasV2AgentExtensionBase):
class TestDeletePort(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestDeletePort, self).setUp()
super().setUp()
self.l2._compute_status = mock.Mock(return_value=nl_consts.ACTIVE)
self.l2._apply_fwg_rules = mock.Mock(return_value=True)
self.l2._send_fwg_status = mock.Mock()
@ -222,7 +222,7 @@ class TestDeletePort(TestFWaasV2AgentExtensionBase):
class TestCreateFirewallGroup(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestCreateFirewallGroup, self).setUp()
super().setUp()
self.l2._apply_fwg_rules = mock.Mock(return_value=True)
self.l2._compute_status = mock.Mock(return_value='ACTIVE')
self.l2._send_fwg_status = mock.Mock()
@ -304,7 +304,7 @@ class TestCreateFirewallGroup(TestFWaasV2AgentExtensionBase):
class TestDeleteFirewallGroup(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestDeleteFirewallGroup, self).setUp()
super().setUp()
self.l2._apply_fwg_rules = mock.Mock(return_value=True)
self.l2._compute_status = mock.Mock(return_value='ACTIVE')
self.l2._send_fwg_status = mock.Mock()
@ -360,7 +360,7 @@ class TestDeleteFirewallGroup(TestFWaasV2AgentExtensionBase):
class TestUpdateFirewallGroup(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestUpdateFirewallGroup, self).setUp()
super().setUp()
self.l2._delete_firewall_group = mock.Mock()
self.l2._create_firewall_group = mock.Mock()
self.l2._send_fwg_status = mock.Mock()
@ -394,20 +394,20 @@ class TestUpdateFirewallGroup(TestFWaasV2AgentExtensionBase):
class TestIsPortLayer2(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestIsPortLayer2, self).setUp()
super().setUp()
def test_vm_port(self):
self.assertTrue(self.l2._is_port_layer2(self.port))
def test_not_vm_port(self):
for device_owner in [nl_consts.DEVICE_OWNER_ROUTER_INTF,
nl_consts.DEVICE_OWNER_ROUTER_GW,
nl_consts.DEVICE_OWNER_DHCP,
nl_consts.DEVICE_OWNER_DVR_INTERFACE,
nl_consts.DEVICE_OWNER_AGENT_GW,
nl_consts.DEVICE_OWNER_ROUTER_SNAT,
'unknown device_owner',
'']:
nl_consts.DEVICE_OWNER_ROUTER_GW,
nl_consts.DEVICE_OWNER_DHCP,
nl_consts.DEVICE_OWNER_DVR_INTERFACE,
nl_consts.DEVICE_OWNER_AGENT_GW,
nl_consts.DEVICE_OWNER_ROUTER_SNAT,
'unknown device_owner',
'']:
self.port['device_owner'] = device_owner
self.assertFalse(self.l2._is_port_layer2(self.port))
@ -419,7 +419,7 @@ class TestIsPortLayer2(TestFWaasV2AgentExtensionBase):
class TestComputeStatus(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestComputeStatus, self).setUp()
super().setUp()
self.ports = list(self.fwg_with_rule['port_details'].values())
def test_normal(self):
@ -530,9 +530,9 @@ class TestComputeStatus(TestFWaasV2AgentExtensionBase):
class TestApplyFwgRules(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestApplyFwgRules, self).setUp()
super().setUp()
class DummyVlan(object):
class DummyVlan:
def __init__(self, vlan=None):
self.vlan = vlan
@ -591,7 +591,7 @@ class TestApplyFwgRules(TestFWaasV2AgentExtensionBase):
class TestSendFwgStatus(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestSendFwgStatus, self).setUp()
super().setUp()
self.rpc.set_firewall_group_status = mock.Mock()
def test_success(self):
@ -607,9 +607,9 @@ class TestSendFwgStatus(TestFWaasV2AgentExtensionBase):
class TestAddLocalVlanToPorts(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestAddLocalVlanToPorts, self).setUp()
super().setUp()
class DummyVlan(object):
class DummyVlan:
def __init__(self, vlan=None):
self.vlan = vlan
@ -671,7 +671,7 @@ class TestAddLocalVlanToPorts(TestFWaasV2AgentExtensionBase):
class TestFWaaSL2PluginApi(TestFWaasV2AgentExtensionBase):
def setUp(self):
super(TestFWaaSL2PluginApi, self).setUp()
super().setUp()
self.plugin = fwaas_v2.FWaaSL2PluginApi(
consts.FIREWALL_PLUGIN, self.host)
@ -710,7 +710,7 @@ class TestFWaaSL2PluginApi(TestFWaasV2AgentExtensionBase):
class TestPortFirewallGroupMap(base.BaseTestCase):
def setUp(self):
super(TestPortFirewallGroupMap, self).setUp()
super().setUp()
self.fake = fake_data.FakeFWaaSL2Agent()
self.map = fwaas_v2.PortFirewallGroupMap()
self.fwg = self.fake.create('fwg')

View File

@ -35,7 +35,7 @@ from neutron_fwaas.tests.unit.services.firewall.service_drivers.agents \
import test_firewall_agent_api
class FWaasHelper(object):
class FWaasHelper:
def __init__(self):
pass
@ -62,7 +62,7 @@ def _setup_test_agent_class(service_plugins):
self.event_observers = mock.Mock()
self.conf = conf
firewall_agent_api._check_required_agent_extension = mock.Mock()
super(FWaasTestAgent, self).__init__(conf)
super().__init__(conf)
def delete_router(self, context, data):
pass
@ -72,7 +72,7 @@ def _setup_test_agent_class(service_plugins):
class TestFWaaSL3AgentExtension(base.BaseTestCase):
def setUp(self):
super(TestFWaaSL3AgentExtension, self).setUp()
super().setUp()
self.conf = cfg.ConfigOpts()
self.conf.register_opts(l3_config.OPTS)
@ -129,10 +129,10 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_create_firewall_group.return_value = True
self.api.create_firewall_group(self.context, firewall_group,
host='host')
host='host')
mock_get_firewall_group_ports.assert_called_once_with(self.context,
firewall_group)
mock_get_firewall_group_ports.assert_called_once_with(
self.context, firewall_group)
mock_get_in_ns_ports.assert_called
assert mock_get_in_ns_ports
mock_set_firewall_group_status.assert_called_once_with(
@ -145,7 +145,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
'add-port-ids': [1, 2],
'del-port-ids': [3, 4],
'router_ids': [],
'last-port': False}
'last-port': False}
self.api.plugin_rpc = mock.Mock()
with mock.patch.object(self.api, '_get_firewall_group_ports'
@ -170,10 +170,10 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock.call(self.context, firewall_group)]
self.api.update_firewall_group(self.context, firewall_group,
host='host')
host='host')
self.assertEqual(mock_get_firewall_group_ports.call_args_list,
calls)
calls)
mock_get_in_ns_ports.assert_called
mock_set_firewall_group_status.assert_called_once_with(
self.context, firewall_group['id'], 'ACTIVE')
@ -202,7 +202,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_update_firewall_group.return_value = True
self.api.update_firewall_group(self.context, firewall_group,
host='host')
host='host')
mock_get_firewall_group_ports.assert_called
mock_get_in_ns_ports.assert_called
@ -232,7 +232,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_delete_firewall_group.return_value = True
self.api.update_firewall_group(self.context, firewall_group,
host='host')
host='host')
calls = [
mock.call._get_firewall_group_ports(
self.context, firewall_group, require_new_plugin=True,
@ -268,7 +268,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_update_firewall_group.return_value = True
self.api.update_firewall_group(self.context, firewall_group,
host='host')
host='host')
calls = [
mock.call._get_firewall_group_ports(
self.context, firewall_group, require_new_plugin=True,
@ -304,7 +304,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_update_firewall_group.return_value = True
self.api.update_firewall_group(self.context, firewall_group,
host='host')
host='host')
calls = [
mock.call._get_firewall_group_ports(
self.context, firewall_group, require_new_plugin=True,
@ -413,13 +413,13 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase):
mock_driver_delete_firewall_group.return_value = True
self.api.delete_firewall_group(self.context, firewall_group,
host='host')
host='host')
mock_get_firewall_group_ports.assert_called_once_with(
self.context, firewall_group, to_delete=True)
mock_get_in_ns_ports.assert_called
mock_firewall_group_deleted.assert_called_once_with(self.context,
firewall_group['id'])
mock_firewall_group_deleted.assert_called_once_with(
self.context, firewall_group['id'])
def _prepare_router_data(self):
return router_info.RouterInfo(self.api,

View File

@ -60,7 +60,7 @@ class FakeAgentApi(agents.FirewallAgentCallbacks):
class TestFirewallAgentApi(base.BaseTestCase):
def setUp(self):
super(TestFirewallAgentApi, self).setUp()
super().setUp()
self.api = agents.FirewallAgentApi('topic', 'host')
@ -109,7 +109,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
'TestL3NatServicePlugin')
l3_plugin = {'l3_plugin_name': l3_plugin_str}
common_conf.register_core_common_config_opts(cfg=cfg.CONF)
super(TestAgentDriver, self).setUp(
super().setUp(
service_provider=FIREWALL_AGENT_PLUGIN_KLASS,
extra_service_plugins=l3_plugin,
extra_extension_paths=neutron_extensions.__path__)
@ -132,7 +132,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
def _get_test_firewall_group_attrs(self, name,
status=nl_constants.INACTIVE):
return super(TestAgentDriver, self)._get_test_firewall_group_attrs(
return super()._get_test_firewall_group_attrs(
name, status=status)
def test_set_firewall_group_status(self):
@ -144,13 +144,13 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
admin_state_up=self.ADMIN_STATE_UP
) as fwg:
fwg_id = fwg['firewall_group']['id']
res = self.callbacks.set_firewall_group_status(ctx, fwg_id,
nl_constants.ACTIVE)
res = self.callbacks.set_firewall_group_status(
ctx, fwg_id, nl_constants.ACTIVE)
fwg_db = self.plugin.get_firewall_group(ctx, fwg_id)
self.assertEqual(nl_constants.ACTIVE, fwg_db['status'])
self.assertTrue(res)
res = self.callbacks.set_firewall_group_status(ctx, fwg_id,
nl_constants.ERROR)
res = self.callbacks.set_firewall_group_status(
ctx, fwg_id, nl_constants.ERROR)
fwg_db = self.plugin.get_firewall_group(ctx, fwg_id)
self.assertEqual(nl_constants.ERROR, fwg_db['status'])
self.assertFalse(res)
@ -247,12 +247,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
def test_create_firewall_group_with_ports(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2:
@ -278,12 +278,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id, ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
def test_create_firewall_group_with_ports_on_diff_routers(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2:
body = self._router_interface_action(
@ -300,7 +300,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
port_id2 = body['port_id']
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r2, \
tenant_id=self._tenant_id) as r2, \
self.subnet() as s3:
body = self._router_interface_action(
@ -320,12 +320,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
def test_create_firewall_group_with_ports_no_policy(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2:
@ -349,12 +349,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
def test_update_firewall_group_with_new_ports_no_policy(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2, \
self.subnet(cidr='30.0.0.0/24') as s3:
@ -387,7 +387,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_group': {'ports': [port_id2, port_id3]}}
req = self.new_update_request('firewall_groups', data,
fwg1['firewall_group']['id'],
@ -404,7 +404,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
def test_update_firewall_group_with_new_ports_status_pending(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2, \
self.subnet(cidr='30.0.0.0/24') as s3:
@ -439,7 +439,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id, ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_group': {'ports': [port_id2, port_id3]}}
req = self.new_update_request('firewall_groups', data,
fwg1['firewall_group']['id'])
@ -449,7 +449,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
def test_update_firewall_group_with_new_ports_status_active(self):
"""neutron firewall_group create test-policy """
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1, \
self.subnet(cidr='20.0.0.0/24') as s2, \
self.subnet(cidr='30.0.0.0/24') as s3:
@ -484,10 +484,11 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id, ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
ctx = context.get_admin_context()
self.callbacks.set_firewall_group_status(ctx,
self.callbacks.set_firewall_group_status(
ctx,
fwg1['firewall_group']['id'], nl_constants.ACTIVE)
data = {'firewall_group': {'ports': [port_id2, port_id3]}}
req = self.new_update_request('firewall_groups', data,
@ -502,7 +503,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1:
body = self._router_interface_action(
@ -522,14 +523,16 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id, ports=[port_id1],
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
ctx = context.get_admin_context()
self.callbacks.set_firewall_group_status(ctx,
self.callbacks.set_firewall_group_status(
ctx,
fwg1['firewall_group']['id'], nl_constants.ACTIVE)
data = {'firewall_rule': {'name': name}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
req = self.new_update_request(
'firewall_rules',
data, fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
for k, v in attrs.items():
@ -539,7 +542,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
"""update should fail"""
name = "new_firewall_rule1"
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id) as r, \
tenant_id=self._tenant_id) as r, \
self.subnet() as s1:
body = self._router_interface_action(
@ -559,11 +562,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
egress_firewall_policy_id=fwp_id, ports=[port_id1],
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.PENDING_CREATE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule': {'name': name}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
req = self.new_update_request(
'firewall_rules',
data, fwr['firewall_rule']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
@ -627,7 +631,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
ports=fwg_ports,
admin_state_up=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
fwp_id = fwp["firewall_policy"]["id"]
data = {'firewall_group': {'ports': fwg_ports}}

View File

@ -66,7 +66,7 @@ class NoopFwaasDriverV2(fwaas_base_v2.FwaasDriverBase):
class TestFWaaSAgentApi(base.BaseTestCase):
def setUp(self):
super(TestFWaaSAgentApi, self).setUp()
super().setUp()
self.api = api.FWaaSPluginApiMixin(
'topic',

View File

@ -27,7 +27,7 @@ FWAAS_NOP_DEVICE = ('neutron_fwaas.tests.unit.services.firewall.'
class TestFirewallDeviceDriverLoading(base.BaseTestCase):
def setUp(self):
super(TestFirewallDeviceDriverLoading, self).setUp()
super().setUp()
self.service = firewall_service.FirewallService()
def test_loading_firewall_device_driver(self):

View File

@ -38,7 +38,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
l3_plugin_str = ('neutron.tests.unit.extensions.test_l3.'
'TestL3NatServicePlugin')
l3_plugin = {'l3_plugin_name': l3_plugin_str}
super(TestOVNFwaasDriver, self).setUp(
super().setUp(
service_provider=OVN_FWAAS_DRIVER,
extra_service_plugins=l3_plugin,
extra_extension_paths=neutron_extensions.__path__)
@ -68,11 +68,11 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
admin_state_up=True,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
def test_create_firewall_group_with_ports(self):
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id, as_admin=True) as r, \
tenant_id=self._tenant_id, as_admin=True) as r, \
self.subnet(as_admin=True) as s1, \
self.subnet(cidr='20.0.0.0/24', as_admin=True) as s2:
body = self._router_interface_action(
@ -108,7 +108,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
def test_update_firewall_group_with_new_ports(self):
with self.router(name='router1', admin_state_up=True,
tenant_id=self._tenant_id, as_admin=True) as r, \
tenant_id=self._tenant_id, as_admin=True) as r, \
self.subnet(as_admin=True) as s1, \
self.subnet(cidr='20.0.0.0/24', as_admin=True) as s2, \
self.subnet(cidr='30.0.0.0/24', as_admin=True) as s3:
@ -149,7 +149,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
ports=fwg_ports, admin_state_up=True,
do_delete=False, as_admin=True) as fwg1:
self.assertEqual(nl_constants.ACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_group': {'ports': [port_id2, port_id3]}}
req = self.new_update_request('firewall_groups', data,
fwg1['firewall_group']['id'],
@ -202,7 +202,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.ACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
fwp_id = fwp["firewall_policy"]["id"]
data = {'firewall_group': {'ports': fwg_ports}}
@ -254,7 +254,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
new_rules = [fwr_id, fwr2_id, fwr3_id]
data = {'firewall_policy': {'firewall_rules':
@ -290,7 +290,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule': {'enabled': False}}
req = (self.
@ -325,7 +325,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule': {'enabled': True}}
req = (self.
@ -361,7 +361,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule': {'action': 'deny'}}
req = (self.
@ -398,7 +398,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule_id': fwr2_id,
'insert_after': fwr_id}
@ -436,7 +436,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
do_delete=False,
as_admin=True) as fwg1:
self.assertEqual(nl_constants.INACTIVE,
fwg1['firewall_group']['status'])
fwg1['firewall_group']['status'])
data = {'firewall_rule_id': fwr2_id}
req = (self.

View File

@ -30,7 +30,7 @@ class FireWallDriverDBMixinTestCase(test_fwaas_plugin_v2.
def setUp(self):
provider = ('neutron_fwaas.services.firewall.service_drivers.'
'driver_api.FirewallDriverDB')
super(FireWallDriverDBMixinTestCase, self).setUp(
super().setUp(
service_provider=provider)
self._mp_registry_publish = mock.patch(
'neutron_lib.callbacks.registry.publish')

View File

@ -37,8 +37,8 @@ from neutron_fwaas.services.firewall.service_drivers.driver_api import \
def http_client_error(req, res):
explanation = "Request '%s %s %s' failed: %s" % (req.method, req.url,
req.body, res.body)
explanation = "Request '{} {} {}' failed: {}".format(req.method, req.url,
req.body, res.body)
return webob.exc.HTTPClientError(code=res.status_int,
explanation=explanation)
@ -65,10 +65,10 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
ADMIN_STATE_UP = True
SHARED = True
resource_prefix_map = dict(
(k, firewall_v2.API_PREFIX)
resource_prefix_map = {
k: firewall_v2.API_PREFIX
for k in firewall_v2.RESOURCE_ATTRIBUTE_MAP.keys()
)
}
def setUp(self, service_provider=None, core_plugin=None,
extra_service_plugins=None, extra_extension_paths=None):
@ -109,7 +109,7 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
plugins,
)
super(FirewallPluginV2TestCase, self).setUp(
super().setUp(
plugin=core_plugin,
service_plugins=service_plugins,
ext_mgr=ext_mgr,
@ -339,11 +339,12 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
firewall_policy = self.deserialize(fmt or self.fmt, res)
fwp_id = firewall_policy["firewall_policy"]["id"]
ingress_firewall_policy_id = fwp_id
data = {'firewall_group': {'name': name,
'description': description,
'ingress_firewall_policy_id': ingress_firewall_policy_id,
'egress_firewall_policy_id': egress_firewall_policy_id,
'admin_state_up': admin_state_up}}
data = {'firewall_group': {
'name': name,
'description': description,
'ingress_firewall_policy_id': ingress_firewall_policy_id,
'egress_firewall_policy_id': egress_firewall_policy_id,
'admin_state_up': admin_state_up}}
ctx = kwargs.get('context', None)
if ctx is None or ctx.is_admin:
tenant_id = kwargs.get('tenant_id', self._tenant_id)
@ -663,7 +664,7 @@ class TestAutomaticAssociation(TestFirewallPluginBasev2):
def setUp(self):
# TODO(yushiro): Replace constant value for this test class
# Set auto association fwg
super(TestAutomaticAssociation, self).setUp()
super().setUp()
def test_vm_port(self):
port = {

View File

@ -35,7 +35,7 @@ class FakeDriver(log_base_driver.DriverBase):
class TestDriverBase(base.BaseTestCase):
def setUp(self):
super(TestDriverBase, self).setUp()
super().setUp()
self.driver = FakeDriver.create()
def test_is_vif_type_compatible(self):

View File

@ -37,15 +37,15 @@ FAKE_BURST = 25
class TestLogPrefix(base.BaseTestCase):
def setUp(self):
super(TestLogPrefix, self).setUp()
super().setUp()
self.log_prefix = log.LogPrefix(FAKE_PORT_ID,
'fake_event',
FAKE_PROJECT_ID)
self.log_prefix.log_object_refs = set([FAKE_LOG_ID])
self.log_prefix.log_object_refs = {FAKE_LOG_ID}
def test_add_log_obj_ref(self):
added_log_id = test_base._uuid
expected_log_obj_ref = set([FAKE_LOG_ID, added_log_id])
expected_log_obj_ref = {FAKE_LOG_ID, added_log_id}
self.log_prefix.add_log_obj_ref(added_log_id)
self.assertEqual(expected_log_obj_ref, self.log_prefix.log_object_refs)
@ -63,7 +63,7 @@ class TestLogPrefix(base.BaseTestCase):
class BaseIptablesLogTestCase(base.BaseTestCase):
def setUp(self):
super(BaseIptablesLogTestCase, self).setUp()
super().setUp()
self.iptables_manager_patch = mock.patch(
'neutron.agent.linux.iptables_manager.IptablesManager')
self.iptables_manager_mock = self.iptables_manager_patch.start()
@ -135,13 +135,13 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
self.log_driver.ipt_mgr_list = self._fake_ipt_mgr_list(f_router_ids)
# Test with a port is delete from router
self.log_driver.unused_port_ids = set(['r1_port1'])
self.log_driver.unused_port_ids = {'r1_port1'}
self.log_driver._cleanup_unused_ipt_mgrs()
self.assertEqual(set(), self.log_driver.unused_port_ids)
self.assertIsNone(self.log_driver.ipt_mgr_list['r1'].get('r1_port1'))
# Test with all ports are deleted from router
self.log_driver.unused_port_ids = set(['r2_port1', 'r2_port2'])
self.log_driver.unused_port_ids = {'r2_port1', 'r2_port2'}
self.log_driver._cleanup_unused_ipt_mgrs()
self.assertEqual(set(), self.log_driver.unused_port_ids)
self.assertIsNone(self.log_driver.ipt_mgr_list.get('r2'))
@ -245,7 +245,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
# Test with prefix already added into prefixes_table
self.log_driver._add_nflog_rules_accepted(ipt_mgr, f_port_log)
self.log_driver._add_rules_to_chain_v4v6.assert_not_called()
self.assertEqual(set(['fake_log_id']),
self.assertEqual({'fake_log_id'},
f_accept_prefix.log_object_refs)
# Test with prefixes_tables does not include the prefix
@ -260,7 +260,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
assert_called_once_with(ipt_mgr, 'accepted',
v4_rules, v6_rules,
wrap=True, top=True, tag=prefix.id)
self.assertEqual(set(['fake_log_id']),
self.assertEqual({'fake_log_id'},
prefix.log_object_refs)
def test_add_nflog_rules_dropped(self):
@ -283,7 +283,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
# Test with prefix already added into prefixes_table
self.log_driver._add_log_rules_dropped(ipt_mgr, f_port_log)
self.log_driver._add_rules_to_chain_v4v6.assert_not_called()
self.assertEqual(set(['fake_log_id']),
self.assertEqual({'fake_log_id'},
f_drop_prefix.log_object_refs)
# Test with prefixes_tables does not include the prefix
@ -302,7 +302,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
]
self.log_driver._add_rules_to_chain_v4v6.\
assert_has_calls(calls)
self.assertEqual(set(['fake_log_id']),
self.assertEqual({'fake_log_id'},
prefix.log_object_refs)
def _fake_port_log(self, log_id, event, port_id):
@ -318,8 +318,8 @@ class BaseIptablesLogTestCase(base.BaseTestCase):
'-j NFLOG --nflog-prefix %s'
% (device, FAKE_RATE, FAKE_BURST, tag)]
v4_nflog_rule += ['-o %s -m limit --limit %s/sec --limit-burst %s '
'-j NFLOG --nflog-prefix %s'
% (device, FAKE_RATE, FAKE_BURST, tag)]
'-j NFLOG --nflog-prefix %s'
% (device, FAKE_RATE, FAKE_BURST, tag)]
v6_nflog_rule = ['-i %s -m limit --limit %s/sec --limit-burst %s '
'-j NFLOG --nflog-prefix %s'
% (device, FAKE_RATE, FAKE_BURST, tag)]

View File

@ -27,7 +27,7 @@ from neutron_fwaas.services.logapi.agents.l3 import fwg_log
class FWaaSL3LoggingExtensionInitializeTestCase(base.L3LoggingExtBaseTestCase):
def setUp(self):
super(FWaaSL3LoggingExtensionInitializeTestCase, self).setUp()
super().setUp()
self.fw_l3_log_ext = fwg_log.FWaaSL3LoggingExtension()
self.fw_l3_log_ext.consume_api(self.agent_api)

View File

@ -20,7 +20,7 @@ from neutron.tests.unit import testlib_api
class BaseLogTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(BaseLogTestCase, self).setUp()
super().setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',

View File

@ -54,7 +54,7 @@ def fake_register():
class TestFirewallGroupRuleCallback(base.BaseTestCase):
def setUp(self):
super(TestFirewallGroupRuleCallback, self).setUp()
super().setUp()
self.driver_manager = driver_mgr.LoggingServiceDriverManager()
self.fwg_callback = fwg_callback.FirewallGroupCallBack(mock.Mock(),
mock.Mock())

View File

@ -70,7 +70,7 @@ def _fake_port_object(port_id, device_owner, status,
class LoggingRpcCallbackTestCase(base.BaseTestCase):
def setUp(self):
super(LoggingRpcCallbackTestCase, self).setUp()
super().setUp()
self.context = mock.Mock()
self.rpc_callback = server_rpc.LoggingApiSkeleton()
@ -282,7 +282,7 @@ class LoggingRpcCallbackTestCase(base.BaseTestCase):
unbound_resource = uuidutils.generate_uuid()
resource_log = _create_log_object(tenant_id, unbound_resource)
target_log = _create_log_object(tenant_id, unbound_resource,
target_id)
target_id)
log_objs = [log, target_log, resource_log]
with mock.patch.object(log_object.Log, 'get_objects',

View File

@ -54,7 +54,7 @@ def fake_register():
class TestFirewallGroupRuleCallback(base.BaseTestCase):
def setUp(self):
super(TestFirewallGroupRuleCallback, self).setUp()
super().setUp()
self.driver_manager = driver_mgr.LoggingServiceDriverManager()
self.port_callback = port_callback.NeutronPortCallBack(mock.Mock(),
mock.Mock())

View File

@ -30,7 +30,7 @@ class TestFWGLogRequestValidations(base.BaseTestCase):
"""Test validator for a log creation request"""
def setUp(self):
super(TestFWGLogRequestValidations, self).setUp()
super().setUp()
fwg_validate.fwg_plugin = mock.Mock()
fwg_validate.fwg_plugin.driver = mock.Mock()
fwg_validate.fwg_plugin.driver.firewall_db = mock.Mock()

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at