diff --git a/.pylintrc b/.pylintrc index 80141cf47..3aea2732b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -43,7 +43,6 @@ disable= global-variable-not-assigned, logging-not-lazy, no-init, - non-parent-init-called, pointless-string-statement, protected-access, redefined-builtin, diff --git a/doc/source/conf.py b/doc/source/conf.py index dacf295e4..46b0d1105 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright (c) 2010 OpenStack Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/neutron_fwaas/db/firewall/firewall_db.py b/neutron_fwaas/db/firewall/firewall_db.py index f3d2018a5..04a6fe375 100644 --- a/neutron_fwaas/db/firewall/firewall_db.py +++ b/neutron_fwaas/db/firewall/firewall_db.py @@ -82,8 +82,8 @@ class FirewallRouterAssociation(model_base.BASEV2): __tablename__ = 'firewall_router_associations' fw_id = sa.Column(sa.String(36), - sa.ForeignKey('firewalls.id', ondelete="CASCADE"), - primary_key=True) + sa.ForeignKey('firewalls.id', ondelete="CASCADE"), + primary_key=True) router_id = sa.Column(sa.String(36), - sa.ForeignKey('routers.id', ondelete="CASCADE"), - primary_key=True) + sa.ForeignKey('routers.id', ondelete="CASCADE"), + primary_key=True) diff --git a/neutron_fwaas/db/firewall/v2/firewall_db_v2.py b/neutron_fwaas/db/firewall/v2/firewall_db_v2.py index bbc3492d1..f8642cf4d 100644 --- a/neutron_fwaas/db/firewall/v2/firewall_db_v2.py +++ b/neutron_fwaas/db/firewall/v2/firewall_db_v2.py @@ -59,11 +59,11 @@ class FirewallDefaultObjectUpdateRestricted(FirewallDefaultParameterExists): "'%(resource_id)s' of type %(resource_type)s.") -class HasName(object): +class HasName: name = sa.Column(sa.String(db_constants.NAME_FIELD_SIZE)) -class HasDescription(object): +class HasDescription: description = sa.Column( sa.String(db_constants.LONG_DESCRIPTION_FIELD_SIZE)) @@ -188,7 +188,7 @@ def _list_firewall_policies_result_filter_hook(query, filters): return query -class FirewallPluginDb(object): +class FirewallPluginDb: def __new__(cls, *args, **kwargs): model_query.register_hook( @@ -204,7 +204,7 @@ class FirewallPluginDb(object): query_hook=None, filter_hook=None, result_filters=_list_firewall_policies_result_filter_hook) - return super(FirewallPluginDb, cls).__new__(cls, *args, **kwargs) + return super().__new__(cls, *args, **kwargs) def _get_firewall_group(self, context, id): try: @@ -254,7 +254,7 @@ class FirewallPluginDb(object): def _validate_fwr_port_range(self, min_port, max_port): if int(min_port) > int(max_port): - port_range = '%s:%s' % (min_port, max_port) + port_range = '{}:{}'.format(min_port, max_port) raise f_exc.FirewallRuleInvalidPortValue(port=port_range) def _get_min_max_ports_from_range(self, port_range): @@ -272,7 +272,7 @@ class FirewallPluginDb(object): if min_port == max_port: return str(min_port) self._validate_fwr_port_range(min_port, max_port) - return '%s:%s' % (min_port, max_port) + return '{}:{}'.format(min_port, max_port) def _make_firewall_rule_dict(self, firewall_rule, fields=None, policies=None): @@ -656,7 +656,7 @@ class FirewallPluginDb(object): rule_id_list, filters): rules_in_fwr_db = model_query.get_collection_query( context, FirewallRuleV2, filters=filters) - rules_dict = dict((fwr_db['id'], fwr_db) for fwr_db in rules_in_fwr_db) + rules_dict = {fwr_db['id']: fwr_db for fwr_db in rules_in_fwr_db} for fwrule_id in rule_id_list: if fwrule_id not in rules_dict: # Bail as soon as we find an invalid rule. @@ -710,7 +710,7 @@ class FirewallPluginDb(object): with db_api.CONTEXT_READER.using(context): fwg_with_fwp_id_db = context.session.query(FirewallGroup).filter( or_(FirewallGroup.ingress_firewall_policy_id == fwp_id, - FirewallGroup.egress_firewall_policy_id == fwp_id)) + FirewallGroup.egress_firewall_policy_id == fwp_id)) for entry in fwg_with_fwp_id_db: if entry.tenant_id != fwp_tenant_id: raise f_exc.FirewallPolicyInUse( @@ -742,7 +742,7 @@ class FirewallPluginDb(object): filters = {'firewall_rule_id': [r_id for r_id in rule_id_list]} # Run a validation on the Firewall Rules table self._check_rules_for_policy_is_valid(context, fwp, fwp_db, - rule_id_list, filters) + rule_id_list, filters) # new rules are valid, lets delete the old association self._delete_all_rules_from_policy(context, fwp_db) # and add in the new association @@ -754,8 +754,8 @@ class FirewallPluginDb(object): context, FirewallPolicyRuleAssociation, filters=filters) - rules_dict = dict((fpol_rul_db['firewall_rule_id'], fpol_rul_db) - for fpol_rul_db in rules_in_fpol_rul_db) + rules_dict = {fpol_rul_db['firewall_rule_id']: fpol_rul_db + for fpol_rul_db in rules_in_fpol_rul_db} fwp_db.rule_associations = [] for fwrule_id in rule_id_list: fwp_db.rule_associations.append(rules_dict[fwrule_id]) @@ -800,7 +800,7 @@ class FirewallPluginDb(object): with db_api.CONTEXT_WRITER.using(context): fwp_db = self._get_firewall_policy(context, id) self._ensure_not_default_resource(fwp_db, 'firewall_policy', - action="update") + action="update") if not fwp.get('shared', True): # an update is setting shared to False, make sure associated # firewall groups are in the same project. diff --git a/neutron_fwaas/db/migration/alembic_migrations/versions/540142f314f4_fwaas_router_insertion.py b/neutron_fwaas/db/migration/alembic_migrations/versions/540142f314f4_fwaas_router_insertion.py index 32e7d9ec2..c9a3b35b7 100644 --- a/neutron_fwaas/db/migration/alembic_migrations/versions/540142f314f4_fwaas_router_insertion.py +++ b/neutron_fwaas/db/migration/alembic_migrations/versions/540142f314f4_fwaas_router_insertion.py @@ -41,14 +41,15 @@ SQL_STATEMENT = ( def upgrade(): op.create_table('firewall_router_associations', - sa.Column('fw_id', sa.String(length=36), nullable=False), - sa.Column('router_id', sa.String(length=36), nullable=False), - sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'], - ondelete='CASCADE'), - sa.ForeignKeyConstraint(['router_id'], ['routers.id'], - ondelete='CASCADE'), - sa.PrimaryKeyConstraint('fw_id', 'router_id'), - ) + sa.Column('fw_id', sa.String(length=36), nullable=False), + sa.Column('router_id', sa.String( + length=36), nullable=False), + sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['router_id'], ['routers.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('fw_id', 'router_id'), + ) # Depending on when neutron-fwaas is installed with neutron, this script # may be run before or after the neutron core tables have had their diff --git a/neutron_fwaas/db/migration/alembic_migrations/versions/796c68dffbb_cisco_csr_fwaas.py b/neutron_fwaas/db/migration/alembic_migrations/versions/796c68dffbb_cisco_csr_fwaas.py index b9f8f9703..a7b72da53 100644 --- a/neutron_fwaas/db/migration/alembic_migrations/versions/796c68dffbb_cisco_csr_fwaas.py +++ b/neutron_fwaas/db/migration/alembic_migrations/versions/796c68dffbb_cisco_csr_fwaas.py @@ -32,14 +32,16 @@ down_revision = '540142f314f4' def upgrade(active_plugins=None, options=None): op.create_table('cisco_firewall_associations', - sa.Column('fw_id', sa.String(length=36), nullable=False), - sa.Column('port_id', sa.String(length=36), nullable=True), - sa.Column('direction', sa.String(length=16), nullable=True), - sa.Column('acl_id', sa.String(length=36), nullable=True), - sa.Column('router_id', sa.String(length=36), nullable=True), - sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'], - ondelete='CASCADE'), - sa.ForeignKeyConstraint(['port_id'], ['ports.id'], - ondelete='CASCADE'), - sa.PrimaryKeyConstraint('fw_id') - ) + sa.Column('fw_id', sa.String(length=36), nullable=False), + sa.Column('port_id', sa.String(length=36), nullable=True), + sa.Column('direction', sa.String( + length=16), nullable=True), + sa.Column('acl_id', sa.String(length=36), nullable=True), + sa.Column('router_id', sa.String( + length=36), nullable=True), + sa.ForeignKeyConstraint(['fw_id'], ['firewalls.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['port_id'], ['ports.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('fw_id') + ) diff --git a/neutron_fwaas/privileged/netfilter_log/libnetfilter_log.py b/neutron_fwaas/privileged/netfilter_log/libnetfilter_log.py index fdd389e31..7b89251af 100644 --- a/neutron_fwaas/privileged/netfilter_log/libnetfilter_log.py +++ b/neutron_fwaas/privileged/netfilter_log/libnetfilter_log.py @@ -171,7 +171,7 @@ def decode(nfa): 'msg': encodeutils.safe_decode(msg)} -class NFLogWrapper(object): +class NFLogWrapper: """A wrapper for libnetfilter_log api""" _instance = None @@ -304,7 +304,7 @@ def run_nflog(namespace=None, group=0): return pid -class NFLogApp(object): +class NFLogApp: """Log application for handling nflog packets""" callback = None diff --git a/neutron_fwaas/privileged/netlink_lib.py b/neutron_fwaas/privileged/netlink_lib.py index 7fac6d250..894860211 100644 --- a/neutron_fwaas/privileged/netlink_lib.py +++ b/neutron_fwaas/privileged/netlink_lib.py @@ -123,7 +123,7 @@ class ConntrackOpenFailedExit(SystemExit): """Raised if we fail to open a new conntrack or conntrack handler""" -class ConntrackManager(object): +class ConntrackManager: def __init__(self, family_socket=None): self.family_socket = family_socket self.set_functions = { @@ -147,14 +147,14 @@ class ConntrackManager(object): 6: nfct.nfct_set_attr_u16}, } self.converters = {'src': bytes, - 'dst': bytes, - 'ipversion': nl_constants.IPVERSION_SOCKET.get, - 'protocol': constants.IP_PROTOCOL_MAP.get, - 'code': int, - 'type': int, - 'id': libc.htons, - 'sport': libc.htons, - 'dport': libc.htons, } + 'dst': bytes, + 'ipversion': nl_constants.IPVERSION_SOCKET.get, + 'protocol': constants.IP_PROTOCOL_MAP.get, + 'code': int, + 'type': int, + 'id': libc.htons, + 'sport': libc.htons, + 'dport': libc.htons, } def list_entries(self): entries = [] diff --git a/neutron_fwaas/services/firewall/fwaas_plugin_v2.py b/neutron_fwaas/services/firewall/fwaas_plugin_v2.py index 288823b6c..d293529f8 100644 --- a/neutron_fwaas/services/firewall/fwaas_plugin_v2.py +++ b/neutron_fwaas/services/firewall/fwaas_plugin_v2.py @@ -48,7 +48,7 @@ class FirewallPluginV2(Firewallv2PluginBase): path_prefix = firewall_v2.API_PREFIX def __init__(self): - super(FirewallPluginV2, self).__init__() + super().__init__() """Do the initialization for the firewall service plugin here.""" # Initialize the Firewall v2 service plugin service_type_manager = st_db.ServiceTypeManager.get_instance() diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/agents.py b/neutron_fwaas/services/firewall/service_drivers/agents/agents.py index 29d3edfa4..e9612da1b 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/agents.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/agents.py @@ -31,7 +31,7 @@ from neutron_fwaas.services.firewall.service_drivers import driver_api LOG = logging.getLogger(__name__) -class FirewallAgentCallbacks(object): +class FirewallAgentCallbacks: target = oslo_messaging.Target(version='1.0') def __init__(self, firewall_db): @@ -106,7 +106,7 @@ class FirewallAgentCallbacks(object): """Get all projects that have firewall_groups.""" ctx = neutron_context.get_admin_context() fwg_list = self.firewall_db.get_firewall_groups(ctx) - fwg_project_list = list(set(fwg['tenant_id'] for fwg in fwg_list)) + fwg_project_list = list({fwg['tenant_id'] for fwg in fwg_list}) return fwg_project_list @log_helpers.log_method_call @@ -136,7 +136,7 @@ class FirewallAgentCallbacks(object): return fwg -class FirewallAgentApi(object): +class FirewallAgentApi: """Plugin side of plugin to agent RPC API""" def __init__(self, topic, host): @@ -169,7 +169,7 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB, """ def __init__(self, service_plugin): - super(FirewallAgentDriver, self).__init__(service_plugin) + super().__init__(service_plugin) self.agent_rpc = FirewallAgentApi(constants.FW_AGENT, cfg.CONF.host) def is_supported_l2_port(self, port): @@ -250,7 +250,7 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB, ports = firewall_group['ports'] if (not ports or (not firewall_group['ingress_firewall_policy_id'] and - not firewall_group['egress_firewall_policy_id'])): + not firewall_group['egress_firewall_policy_id'])): # no messaging to agent needed and fw needs to go to INACTIVE state # as no associated ports and/or no policy configured. status = nl_constants.INACTIVE @@ -317,9 +317,9 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB, not new_firewall_group['ports']) LOG.debug("update_firewall_group %s: Add Ports: %s, Del Ports: %s", - new_firewall_group['id'], - fwg_with_rules['add-port-ids'], - fwg_with_rules['del-port-ids']) + new_firewall_group['id'], + fwg_with_rules['add-port-ids'], + fwg_with_rules['del-port-ids']) fwg_with_rules['port_details'] = self._get_fwg_port_details( context, fwg_with_rules['del-port-ids']) diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/conntrack_base.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/conntrack_base.py index 77dfb1827..d40ba8ab0 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/conntrack_base.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/conntrack_base.py @@ -37,7 +37,7 @@ def load_and_init_conntrack_driver(*args, **kwargs): return conntrack_driver -class ConntrackDriverBase(object, metaclass=abc.ABCMeta): +class ConntrackDriverBase(metaclass=abc.ABCMeta): """Base Driver for Conntrack""" @abc.abstractmethod diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base.py index afdb7d15d..942f26007 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base.py @@ -16,7 +16,7 @@ import abc -class FwaasDriverBase(object, metaclass=abc.ABCMeta): +class FwaasDriverBase(metaclass=abc.ABCMeta): """Firewall as a Service Driver base class. Using FwaasDriver Class, an instance of L3 perimeter Firewall diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base_v2.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base_v2.py index 174114773..2387cd113 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base_v2.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/fwaas_base_v2.py @@ -16,7 +16,7 @@ import abc -class FwaasDriverBase(object, metaclass=abc.ABCMeta): +class FwaasDriverBase(metaclass=abc.ABCMeta): """Firewall as a Service Driver base class. Using FwaasDriver Class, an instance of L3 perimeter Firewall diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/iptables_fwaas_v2.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/iptables_fwaas_v2.py index 8a04511e2..eb811d3e9 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/iptables_fwaas_v2.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/iptables_fwaas_v2.py @@ -69,7 +69,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): self.conntrack = conntrack_base.load_and_init_conntrack_driver() def _get_intf_name(self, if_prefix, port_id): - _name = "%s%s" % (if_prefix, port_id) + _name = "{}{}".format(if_prefix, port_id) return _name[:MAX_INTF_NAME_LEN] def create_firewall_group(self, agent_mode, apply_list, firewall): @@ -140,11 +140,12 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): if firewall['admin_state_up']: self._setup_firewall(agent_mode, apply_list, firewall) if self.pre_firewall: - self._remove_conntrack_updated_firewall(agent_mode, - apply_list, self.pre_firewall, firewall) + self._remove_conntrack_updated_firewall( + agent_mode, + apply_list, self.pre_firewall, firewall) else: self._remove_conntrack_new_firewall(agent_mode, - apply_list, firewall) + apply_list, firewall) else: self.apply_default_policy(agent_mode, apply_list, firewall) self.pre_firewall = dict(firewall) @@ -210,9 +211,9 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): ipt_mgr.defer_apply_off() def _get_chain_name(self, fwid, ver, direction): - return '%s%s%s' % (CHAIN_NAME_PREFIX[direction], - IP_VER_TAG[ver], - fwid) + return '{}{}{}'.format(CHAIN_NAME_PREFIX[direction], + IP_VER_TAG[ver], + fwid) def _setup_chains(self, firewall, ipt_if_prefix, router_fw_ports): """Create Fwaas chain using the rules in the policy @@ -293,7 +294,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): fw_rules = firewall[fw_rule_list] fw_rule_ids = [fw_rule['id'] for fw_rule in fw_rules] removed_rules.extend([pre_fw_rule for pre_fw_rule in pre_fw_rules - if pre_fw_rule['id'] not in fw_rule_ids]) + if pre_fw_rule['id'] not in fw_rule_ids]) return removed_rules def _find_new_rules(self, pre_firewall, firewall): @@ -301,7 +302,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): def _remove_conntrack_new_firewall(self, agent_mode, apply_list, firewall): """Remove conntrack when create new firewall""" - routers_list = list(set([apply_info[0] for apply_info in apply_list])) + routers_list = list({apply_info[0] for apply_info in apply_list}) for ri in routers_list: ipt_if_prefix_list = self._get_ipt_mgrs_with_if_prefix( agent_mode, ri) @@ -312,7 +313,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): def _remove_conntrack_updated_firewall(self, agent_mode, apply_list, pre_firewall, firewall): """Remove conntrack when updated firewall""" - routers_list = list(set([apply_info[0] for apply_info in apply_list])) + routers_list = list({apply_info[0] for apply_info in apply_list}) for ri in routers_list: ipt_if_prefix_list = self._get_ipt_mgrs_with_if_prefix( agent_mode, ri) @@ -418,7 +419,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): def _get_action_chain(self, name): binary_name = iptables_manager.binary_name chain_name = iptables_manager.get_chain_name(name) - return '%s-%s' % (binary_name, chain_name) + return '{}-{}'.format(binary_name, chain_name) def _enable_policy_chain(self, fwid, ipt_if_prefix, router_fw_ports): bname = iptables_manager.binary_name @@ -435,18 +436,18 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): for router_fw_port in router_fw_ports: intf_name = self._get_intf_name(if_prefix, router_fw_port) - jump_rule = ['%s %s -j %s-%s' % ( + jump_rule = ['{} {} -j {}-{}'.format( IPTABLES_DIR[direction], intf_name, bname, chain_name)] self._add_rules_to_chain(ipt_mgr, ver, - 'FORWARD', jump_rule) + 'FORWARD', jump_rule) # jump to DROP_ALL policy chain_name = iptables_manager.get_chain_name(FWAAS_DEFAULT_CHAIN) for router_fw_port in router_fw_ports: intf_name = self._get_intf_name(if_prefix, router_fw_port) - jump_rule = ['-o %s -j %s-%s' % (intf_name, bname, chain_name)] + jump_rule = ['-o {} -j {}-{}'.format(intf_name, bname, chain_name)] self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule) self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule) @@ -455,7 +456,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase): for router_fw_port in router_fw_ports: intf_name = self._get_intf_name(if_prefix, router_fw_port) - jump_rule = ['-i %s -j %s-%s' % (intf_name, bname, chain_name)] + jump_rule = ['-i {} -j {}-{}'.format(intf_name, bname, chain_name)] self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule) self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule) diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/driver_base.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/driver_base.py index 6abcbe8ba..5ce1cbcd1 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/driver_base.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/driver_base.py @@ -16,7 +16,7 @@ import abc import contextlib -class FirewallL2DriverBase(object, metaclass=abc.ABCMeta): +class FirewallL2DriverBase(metaclass=abc.ABCMeta): """Abstract firewall L2 driver base""" def __init__(self, integration_bridge, sg_enabled=False): diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/firewall.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/firewall.py index 1143e5e05..76dced671 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/firewall.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/firewall.py @@ -51,7 +51,7 @@ def _replace_register(flow_params, register_number, register_value): try: reg_port = flow_params[register_value] del flow_params[register_value] - flow_params['reg{:d}'.format(register_number)] = reg_port + flow_params[f'reg{register_number:d}'] = reg_port except KeyError: pass @@ -64,7 +64,7 @@ def create_reg_numbers(flow_params): _replace_register(flow_params, fwaas_ovs_consts.REG_NET, 'reg_net') -class FirewallGroup(object): +class FirewallGroup: def __init__(self, id_): self.id = id_ self.ingress_rules = [] @@ -105,7 +105,7 @@ class FirewallGroup(object): # NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver that # differs only in firewall groups list field name -class OFPort(object): +class OFPort: def __init__(self, port_dict, ovs_port, vlan_tag): self.id = port_dict['device'] self.vlan_tag = vlan_tag @@ -157,7 +157,7 @@ class OFPort(object): # NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver that # differs in methods name [s/sg/fwg] and update_rules method. -class FWGPortMap(object): +class FWGPortMap: def __init__(self): self.ports = {} self.fw_groups = {} @@ -253,7 +253,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): dl_type = kwargs.get('dl_type') create_reg_numbers(kwargs) if isinstance(dl_type, int): - kwargs['dl_type'] = "0x{:04x}".format(dl_type) + kwargs['dl_type'] = f"0x{dl_type:04x}" if self._deferred: self.int_br.add_flow(**kwargs) else: @@ -667,7 +667,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): nw_proto=lib_const.PROTO_NUM_UDP, tp_src=src_port, tp_dst=dst_port, - actions='resubmit(,{:d})'.format(accept_or_ingress) + actions=f'resubmit(,{accept_or_ingress:d})' ) # Ban dhcp service running on an instance for dl_type, src_port, dst_port in ( @@ -724,7 +724,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): fwaas_ovs_consts.FW_BASE_INGRESS_TABLE), ) for ethertype in [lib_const.ETHERTYPE_IP, - lib_const.ETHERTYPE_IPV6]: + lib_const.ETHERTYPE_IPV6]: self._add_flow( table=fwaas_ovs_consts.FW_ACCEPT_OR_INGRESS_TABLE, priority=90, @@ -807,7 +807,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): dl_type=lib_const.ETHERTYPE_IPV6, nw_proto=lib_const.PROTO_NUM_IPV6_ICMP, icmp_type=icmp_type, - actions='output:{:d}'.format(port.ofport) + actions=f'output:{port.ofport:d}' ) # NOTE(ivasilevskaya) That's a copy-paste from neutron ovsfw driver @@ -819,7 +819,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): priority=100, dl_type=lib_const.ETHERTYPE_ARP, reg_port=port.ofport, - actions='output:{:d}'.format(port.ofport) + actions=f'output:{port.ofport:d}' ) self._initialize_ingress_ipv6_icmp(port) @@ -835,7 +835,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): nw_proto=lib_const.PROTO_NUM_UDP, tp_src=src_port, tp_dst=dst_port, - actions='output:{:d}'.format(port.ofport) + actions=f'output:{port.ofport:d}' ) # Track untracked @@ -888,7 +888,7 @@ class OVSFirewallDriver(driver_base.FirewallL2DriverBase): ct_state=state, ct_mark=fwaas_ovs_consts.CT_MARK_NORMAL, ct_zone=port.vlan_tag, - actions='output:{:d}'.format(port.ofport) + actions=f'output:{port.ofport:d}' ) self._add_flow( table=fwaas_ovs_consts.FW_RULES_INGRESS_TABLE, diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/rules.py b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/rules.py index 1d227207c..d0936b36f 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/rules.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/rules.py @@ -87,7 +87,7 @@ def populate_flow_common(direction, flow_template, port): """Initialize common flow fields.""" if direction == n_consts.INGRESS_DIRECTION: flow_template['table'] = fwaas_ovs_consts.FW_RULES_INGRESS_TABLE - flow_template['actions'] = "output:{:d}".format(port.ofport) + flow_template['actions'] = f"output:{port.ofport:d}" elif direction == n_consts.EGRESS_DIRECTION: flow_template['table'] = fwaas_ovs_consts.FW_RULES_EGRESS_TABLE # Traffic can be both ingress and egress, check that no ingress rules @@ -121,10 +121,10 @@ def create_port_range_flows(flow_template, rule): if protocol is None: return [] flows = [] - src_port_match = '{:s}_src'.format(protocol) + src_port_match = f'{protocol:s}_src' src_port_min = rule.get('source_port_range_min') src_port_max = rule.get('source_port_range_max') - dst_port_match = '{:s}_dst'.format(protocol) + dst_port_match = f'{protocol:s}_dst' dst_port_min = rule.get('port_range_min') dst_port_max = rule.get('port_range_max') diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/firewall_agent_api.py b/neutron_fwaas/services/firewall/service_drivers/agents/firewall_agent_api.py index 9fb36113c..596d20718 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/firewall_agent_api.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/firewall_agent_api.py @@ -50,12 +50,12 @@ FWaaSOpts = [ cfg.CONF.register_opts(FWaaSOpts, 'fwaas') -class FWaaSPluginApiMixin(object): +class FWaaSPluginApiMixin: """Agent side of the FWaaS agent to FWaaS Plugin RPC API.""" def __init__(self, topic, host): # NOTE(annp): Mixin class should call super - super(FWaaSPluginApiMixin, self).__init__() + super().__init__() self.host = host target = oslo_messaging.Target(topic=topic, version='1.0') @@ -74,12 +74,12 @@ class FWaaSPluginApiMixin(object): firewall_id=firewall_id) -class FWaaSAgentRpcCallbackMixin(object): +class FWaaSAgentRpcCallbackMixin: """Mixin for FWaaS agent Implementations.""" def __init__(self, host): - super(FWaaSAgentRpcCallbackMixin, self).__init__(host) + super().__init__(host) def create_firewall(self, context, firewall, host): """Handle RPC cast from plugin to create a firewall.""" diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/firewall_service.py b/neutron_fwaas/services/firewall/service_drivers/agents/firewall_service.py index aa0f17d7a..b4819701a 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/firewall_service.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/firewall_service.py @@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__) FIREWALL_DRIVERS = 'firewall_drivers' -class FirewallService(object): +class FirewallService: """Firewall Service observer.""" def load_device_drivers(self): diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py b/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py index 2862391d4..13d4a479d 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py @@ -409,7 +409,7 @@ class FWaaSV2AgentExtension(l2_extension.L2AgentExtension): self._send_fwg_status(context, fwg['id'], status, self.conf.host) -class PortFirewallGroupMap(object): +class PortFirewallGroupMap: """Store relations between Port and Firewall Group and trusted port This map is used in deleting firewall_group because the firewall_group has diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/l3reference/firewall_l3_agent_v2.py b/neutron_fwaas/services/firewall/service_drivers/agents/l3reference/firewall_l3_agent_v2.py index 6043023a0..0c68e177e 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/l3reference/firewall_l3_agent_v2.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/l3reference/firewall_l3_agent_v2.py @@ -37,14 +37,14 @@ LOG = logging.getLogger(__name__) class FWaaSL3PluginApi(api.FWaaSPluginApiMixin): """Agent side of the FWaaS agent-to-plugin RPC API.""" def __init__(self, topic, host): - super(FWaaSL3PluginApi, self).__init__(topic, host) + super().__init__(topic, host) def get_firewall_groups_for_project(self, context, **kwargs): """Fetches a project's firewall groups from the plugin.""" LOG.debug("Fetch firewall groups from plugin") cctxt = self.client.prepare() return cctxt.call(context, 'get_firewall_groups_for_project', - host=self.host) + host=self.host) def get_projects_with_firewall_groups(self, context, **kwargs): """Fetches from the plugin all projects that have firewall groups @@ -61,14 +61,14 @@ class FWaaSL3PluginApi(api.FWaaSPluginApiMixin): LOG.debug("Notify plugin that firewall group has been deleted") cctxt = self.client.prepare() return cctxt.call(context, 'firewall_group_deleted', fwg_id=fwg_id, - host=self.host) + host=self.host) def set_firewall_group_status(self, context, fwg_id, status, **kwargs): """Sets firewall group's status on the plugin.""" LOG.debug("Set firewall groups from plugin") cctxt = self.client.prepare() return cctxt.call(context, 'set_firewall_group_status', - fwg_id=fwg_id, status=status, host=self.host) + fwg_id=fwg_id, status=status, host=self.host) class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): @@ -125,7 +125,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): self.services_sync_needed = False self.fwplugin_rpc = FWaaSL3PluginApi(fwaas_constants.FIREWALL_PLUGIN, host) - super(FWaaSL3AgentExtension, self).__init__() + super().__init__() @property def _local_namespaces(self): @@ -141,7 +141,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): return 'add-port-ids' in firewall_group def _get_firewall_group_ports(self, context, firewall_group, - to_delete=False, require_new_plugin=False): + to_delete=False, require_new_plugin=False): """Returns in-namespace ports, either from firewall group dict if ports update or from project routers otherwise if only policies update. @@ -160,7 +160,7 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): for router in routers: if router.router['tenant_id'] == firewall_group['tenant_id']: fwg_port_ids.extend([p['id'] for p in - router.internal_ports]) + router.internal_ports]) # Return in-namespace port objects. return self._get_in_ns_ports(fwg_port_ids) @@ -260,9 +260,9 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): return # Apply a firewall group, as requested, to ports on the new router. - all_router_ports = set( + all_router_ports = { p['id'] for p in updated_router[nl_constants.INTERFACE_KEY] - ) + } processed_ports = set() for firewall_group in fwg_list: if not self._has_port_insertion_fields(firewall_group): @@ -368,8 +368,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): return LOG.debug("Create firewall group %(fwg_id)s on ports: %(ports)s", - {'fwg_id': firewall_group['id'], - 'ports': ', '.join([p for ri_ports in ports_for_fwg + {'fwg_id': firewall_group['id'], + 'ports': ', '.join([p for ri_ports in ports_for_fwg for p in ri_ports[1]])}) # Set firewall group status; will be overwritten if call to driver @@ -392,8 +392,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): # Send firewall group's status to plugin. try: - self.fwplugin_rpc.set_firewall_group_status(context, - firewall_group['id'], status) + self.fwplugin_rpc.set_firewall_group_status( + context, firewall_group['id'], status) except Exception: msg = ("FWaaS RPC failure in create_firewall_group " "for firewall group: %(fwg_id)s") @@ -485,8 +485,8 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): # Return status to plugin. try: - self.fwplugin_rpc.set_firewall_group_status(context, - firewall_group['id'], status) + self.fwplugin_rpc.set_firewall_group_status( + context, firewall_group['id'], status) except Exception: LOG.exception("FWaaS RPC failure in update_firewall_group " "for firewall group: %s", firewall_group['id']) @@ -528,11 +528,11 @@ class FWaaSL3AgentExtension(l3_extension.L3AgentExtension): # plugin, as appropriate. try: if status in [nl_constants.ACTIVE, nl_constants.DOWN]: - self.fwplugin_rpc.firewall_group_deleted(context, - firewall_group['id']) + self.fwplugin_rpc.firewall_group_deleted( + context, firewall_group['id']) else: - self.fwplugin_rpc.set_firewall_group_status(context, - firewall_group['id'], status) + self.fwplugin_rpc.set_firewall_group_status( + context, firewall_group['id'], status) except Exception: LOG.exception("FWaaS RPC failure in delete_firewall_group " "for firewall group: %s", firewall_group['id']) @@ -549,4 +549,4 @@ class L3WithFWaaS(FWaaSL3AgentExtension): self.conf = conf else: self.conf = cfg.CONF - super(L3WithFWaaS, self).__init__(host=self.conf.host, conf=self.conf) + super().__init__(host=self.conf.host, conf=self.conf) diff --git a/neutron_fwaas/services/firewall/service_drivers/driver_api.py b/neutron_fwaas/services/firewall/service_drivers/driver_api.py index 17d692d7f..ee7b1af98 100644 --- a/neutron_fwaas/services/firewall/service_drivers/driver_api.py +++ b/neutron_fwaas/services/firewall/service_drivers/driver_api.py @@ -31,7 +31,7 @@ from neutron_fwaas.db.firewall.v2 import firewall_db_v2 LOG = logging.getLogger(__name__) -class FirewallDriver(object, metaclass=abc.ABCMeta): +class FirewallDriver(metaclass=abc.ABCMeta): """Firewall v2 interface for driver That driver interface does not persist Firewall v2 data in any database. @@ -131,7 +131,7 @@ class FirewallDriverDBMixin(FirewallDriver, metaclass=abc.ABCMeta): """ def __init__(self, *args, **kwargs): - super(FirewallDriverDBMixin, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.firewall_db = firewall_db_v2.FirewallPluginDb() @staticmethod @@ -517,7 +517,7 @@ class FirewallDriverDB(FirewallDriverDBMixin): pass -class FirewallDriverRPCMixin(object, metaclass=abc.ABCMeta): +class FirewallDriverRPCMixin(metaclass=abc.ABCMeta): """FirewallAgent interface for driver with rpc callback listener. Each firewall backend driver that needs a rpc callback listener should diff --git a/neutron_fwaas/services/firewall/service_drivers/ovn/acl.py b/neutron_fwaas/services/firewall/service_drivers/ovn/acl.py index 9de721ea6..510d68437 100644 --- a/neutron_fwaas/services/firewall/service_drivers/ovn/acl.py +++ b/neutron_fwaas/services/firewall/service_drivers/ovn/acl.py @@ -27,7 +27,7 @@ def acl_direction(direction, port_group=None): portdir = 'inport' else: portdir = 'outport' - return '%s == @%s' % (portdir, port_group) + return '{} == @{}'.format(portdir, port_group) def acl_ethertype(rule): @@ -48,8 +48,8 @@ def acl_ethertype(rule): def acl_ip(rule, ip_version): src_ip = rule.get('source_ip_address') dst_ip = rule.get('destination_ip_address') - src = ' && %s.src == %s' % (ip_version, src_ip) if src_ip else '' - dst = ' && %s.dst == %s' % (ip_version, dst_ip) if dst_ip else '' + src = ' && {}.src == {}'.format(ip_version, src_ip) if src_ip else '' + dst = ' && {}.dst == {}'.format(ip_version, dst_ip) if dst_ip else '' return src + dst diff --git a/neutron_fwaas/services/firewall/service_drivers/ovn/firewall_l3_driver.py b/neutron_fwaas/services/firewall/service_drivers/ovn/firewall_l3_driver.py index 715bd86bd..ea30a5615 100644 --- a/neutron_fwaas/services/firewall/service_drivers/ovn/firewall_l3_driver.py +++ b/neutron_fwaas/services/firewall/service_drivers/ovn/firewall_l3_driver.py @@ -36,7 +36,7 @@ class OVNFwaasDriver(driver_api.FirewallDriverDB): """ def __init__(self, service_plugin): - super(OVNFwaasDriver, self).__init__(service_plugin) + super().__init__(service_plugin) self._mech = None def is_supported_l2_port(self, port): diff --git a/neutron_fwaas/services/logapi/agents/drivers/iptables/log.py b/neutron_fwaas/services/logapi/agents/drivers/iptables/log.py index f22a662fe..d7b75022b 100644 --- a/neutron_fwaas/services/logapi/agents/drivers/iptables/log.py +++ b/neutron_fwaas/services/logapi/agents/drivers/iptables/log.py @@ -64,7 +64,7 @@ def setup_logging(): LOG.logger.addHandler(syslog_handler) -class LogPrefix(object): +class LogPrefix: """LogPrefix could be used as prefix in NFLOG rules Each of a couple (port_id, event) has its own LogPrefix object """ @@ -99,7 +99,7 @@ class LogPrefix(object): return not self.log_object_refs -class FWGPortLog(object): +class FWGPortLog: """A firewall group port log per log_object""" def __init__(self, port_id, log_info): @@ -234,7 +234,7 @@ class IptablesLoggingDriver(log_ext.LoggingDriver): # Start libnetfilter_log after router starting up pid = libnflog.run_nflog(router_info.ns_name) LOG.debug("NFLOG process ID %s for router %s has started", - pid, router_info.router_id) + pid, router_info.router_id) self.nflog_proc_map[router_id] = pid else: # Handle the log request @@ -502,7 +502,7 @@ class IptablesLoggingDriver(log_ext.LoggingDriver): def _generate_iptables_args(self, direction, device, prefix=None): direction_config = ['-%s %s' % - (IPTABLES_DIRECTION_DEVICE[direction], device)] + (IPTABLES_DIRECTION_DEVICE[direction], device)] match_rule = [] if self.rate_limit: match_rule += [ diff --git a/neutron_fwaas/services/logapi/agents/l3/fwg_log.py b/neutron_fwaas/services/logapi/agents/l3/fwg_log.py index 4d5262ba9..e5c940ab5 100644 --- a/neutron_fwaas/services/logapi/agents/l3/fwg_log.py +++ b/neutron_fwaas/services/logapi/agents/l3/fwg_log.py @@ -23,7 +23,7 @@ FIREWALL_LOG_DRIVER_NAME = 'fwaas_v2_log' class FWaaSL3LoggingExtension(base.L3LoggingExtensionBase, - l3_extension.L3AgentExtension): + l3_extension.L3AgentExtension): def initialize(self, connection, driver_type): """Initialize L3 logging agent extension""" diff --git a/neutron_fwaas/tests/fullstack/base.py b/neutron_fwaas/tests/fullstack/base.py index 8d7014a07..2c1619e78 100644 --- a/neutron_fwaas/tests/fullstack/base.py +++ b/neutron_fwaas/tests/fullstack/base.py @@ -34,7 +34,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin, BUILD_WITH_MIGRATIONS = True def setUp(self, environment): - super(BaseFullStackTestCase, self).setUp() + super().setUp() tests_base.setup_test_logging( cfg.CONF, DEFAULT_LOG_DIR, '%s.txt' % self.get_name()) @@ -66,4 +66,4 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin, def get_name(self): class_name, test_name = self.id().split(".")[-2:] - return "%s.%s" % (class_name, test_name) + return "{}.{}".format(class_name, test_name) diff --git a/neutron_fwaas/tests/fullstack/resources/client.py b/neutron_fwaas/tests/fullstack/resources/client.py index d97675fd0..3871393a5 100644 --- a/neutron_fwaas/tests/fullstack/resources/client.py +++ b/neutron_fwaas/tests/fullstack/resources/client.py @@ -38,7 +38,7 @@ class ClientFixture(fixtures.Fixture): """Manage and cleanup neutron resources.""" def __init__(self, client): - super(ClientFixture, self).__init__() + super().__init__() self.client = client def _create_resource(self, resource_type, spec): diff --git a/neutron_fwaas/tests/fullstack/resources/config.py b/neutron_fwaas/tests/fullstack/resources/config.py index 60b2114ef..6f29a1e00 100644 --- a/neutron_fwaas/tests/fullstack/resources/config.py +++ b/neutron_fwaas/tests/fullstack/resources/config.py @@ -33,7 +33,7 @@ class ConfigFixture(fixtures.Fixture): is initializing a new instance of the class. """ def __init__(self, env_desc, host_desc, temp_dir, base_filename): - super(ConfigFixture, self).__init__() + super().__init__() self.config = config_fixtures.ConfigDict() self.env_desc = env_desc self.host_desc = host_desc @@ -51,7 +51,7 @@ class NeutronConfigFixture(ConfigFixture): def __init__(self, env_desc, host_desc, temp_dir, connection, rabbitmq_environment): - super(NeutronConfigFixture, self).__init__( + super().__init__( env_desc, host_desc, temp_dir, base_filename='neutron.conf') service_plugins = ['router', 'trunk'] @@ -90,7 +90,7 @@ class NeutronConfigFixture(ConfigFixture): 'bind_port': self.useFixture( port.ExclusivePort(constants.PROTO_NAME_TCP)).port }) - super(NeutronConfigFixture, self)._setUp() + super()._setUp() def _generate_host(self): return utils.get_rand_name(prefix='host-') @@ -110,7 +110,7 @@ class NeutronConfigFixture(ConfigFixture): class ML2ConfigFixture(ConfigFixture): def __init__(self, env_desc, host_desc, temp_dir, tenant_network_types): - super(ML2ConfigFixture, self).__init__( + super().__init__( env_desc, host_desc, temp_dir, base_filename='ml2_conf.ini') mechanism_drivers = self.env_desc.mech_drivers @@ -141,7 +141,7 @@ class ML2ConfigFixture(ConfigFixture): class OVSConfigFixture(ConfigFixture): def __init__(self, env_desc, host_desc, temp_dir, local_ip): - super(OVSConfigFixture, self).__init__( + super().__init__( env_desc, host_desc, temp_dir, base_filename='openvswitch_agent.ini') @@ -182,7 +182,7 @@ class OVSConfigFixture(ConfigFixture): 'of_listen_port': self.useFixture( port.ExclusivePort(constants.PROTO_NAME_TCP)).port }) - super(OVSConfigFixture, self)._setUp() + super()._setUp() def _generate_bridge_mappings(self): return 'physnet1:%s' % utils.get_rand_device_name(prefix='br-eth') @@ -213,7 +213,7 @@ class LinuxBridgeConfigFixture(ConfigFixture): def __init__(self, env_desc, host_desc, temp_dir, local_ip, physical_device_name): - super(LinuxBridgeConfigFixture, self).__init__( + super().__init__( env_desc, host_desc, temp_dir, base_filename="linuxbridge_agent.ini" ) @@ -255,7 +255,7 @@ class LinuxBridgeConfigFixture(ConfigFixture): class L3ConfigFixture(ConfigFixture): def __init__(self, env_desc, host_desc, temp_dir, integration_bridge=None): - super(L3ConfigFixture, self).__init__( + super().__init__( env_desc, host_desc, temp_dir, base_filename='l3_agent.ini') if host_desc.l2_agent_type == constants.AGENT_TYPE_OVS: self._prepare_config_with_ovs_agent(integration_bridge) diff --git a/neutron_fwaas/tests/fullstack/resources/environment.py b/neutron_fwaas/tests/fullstack/resources/environment.py index 955e67a1a..fed2f3400 100644 --- a/neutron_fwaas/tests/fullstack/resources/environment.py +++ b/neutron_fwaas/tests/fullstack/resources/environment.py @@ -28,7 +28,7 @@ from neutron.tests.fullstack.resources import config from neutron.tests.fullstack.resources import process -class EnvironmentDescription(object): +class EnvironmentDescription: """A set of characteristics of an environment setup. Does the setup, as a whole, support tunneling? How about l2pop? @@ -47,7 +47,7 @@ class EnvironmentDescription(object): return self.network_type in ('vxlan', 'gre') -class HostDescription(object): +class HostDescription: """A set of characteristics of an environment Host. What agents should the host spawn? What mode should each agent operate @@ -275,7 +275,7 @@ class Environment(fixtures.Fixture): :param hosts_desc: A list of HostDescription instances. """ - super(Environment, self).__init__() + super().__init__() self.env_desc = env_desc self.hosts_desc = hosts_desc self.hosts = [] diff --git a/neutron_fwaas/tests/fullstack/resources/machine.py b/neutron_fwaas/tests/fullstack/resources/machine.py index c263a2f05..2e1d10ef5 100644 --- a/neutron_fwaas/tests/fullstack/resources/machine.py +++ b/neutron_fwaas/tests/fullstack/resources/machine.py @@ -44,7 +44,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase): def __init__(self, host, network_id, tenant_id, safe_client, neutron_port=None, bridge_name=None): - super(FakeFullstackMachine, self).__init__() + super().__init__() self.host = host self.tenant_id = tenant_id self.network_id = network_id @@ -53,7 +53,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase): self.bridge_name = bridge_name def _setUp(self): - super(FakeFullstackMachine, self)._setUp() + super()._setUp() self.bridge = self._get_bridge() @@ -81,8 +81,8 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase): self.neutron_port['id'], {'port': {pbs.HOST_ID: self.host.hostname}}) self.addCleanup(self.safe_client.client.update_port, - self.neutron_port['id'], - {'port': {pbs.HOST_ID: ''}}) + self.neutron_port['id'], + {'port': {pbs.HOST_ID: ''}}) def _get_bridge(self): if self.bridge_name is None: @@ -107,7 +107,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase): subnet_id = fixed_ip['subnet_id'] subnet = self.safe_client.client.show_subnet(subnet_id) prefixlen = netaddr.IPNetwork(subnet['subnet']['cidr']).prefixlen - self._ip_cidr = '%s/%s' % (self._ip, prefixlen) + self._ip_cidr = '{}/{}'.format(self._ip, prefixlen) # TODO(amuller): Support DHCP self.port.addr.add(self.ip_cidr) @@ -152,7 +152,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase): class FakeFullstackTrunkMachine(FakeFullstackMachine): def __init__(self, trunk, *args, **kwargs): - super(FakeFullstackTrunkMachine, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.trunk = trunk def add_vlan_interface(self, mac_address, ip_address, segmentation_id): diff --git a/neutron_fwaas/tests/fullstack/resources/process.py b/neutron_fwaas/tests/fullstack/resources/process.py index 9f8727bc9..286c27e2e 100644 --- a/neutron_fwaas/tests/fullstack/resources/process.py +++ b/neutron_fwaas/tests/fullstack/resources/process.py @@ -32,7 +32,7 @@ from neutron.tests.fullstack import base as fullstack_base class ProcessFixture(fixtures.Fixture): def __init__(self, test_name, process_name, exec_name, config_filenames, namespace=None, kill_signal=signal.SIGKILL): - super(ProcessFixture, self).__init__() + super().__init__() self.test_name = test_name self.process_name = process_name self.exec_name = exec_name @@ -52,7 +52,7 @@ class ProcessFixture(fixtures.Fixture): common_utils.ensure_dir(log_dir) timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S-%f") - log_file = "%s--%s.log" % (self.process_name, timestamp) + log_file = "{}--{}.log".format(self.process_name, timestamp) cmd = [shutil.which(self.exec_name), '--log-dir', log_dir, '--log-file', log_file] @@ -71,7 +71,7 @@ class ProcessFixture(fixtures.Fixture): class RabbitmqEnvironmentFixture(fixtures.Fixture): def __init__(self, host="127.0.0.1"): - super(RabbitmqEnvironmentFixture, self).__init__() + super().__init__() self.host = host def _setUp(self): @@ -100,7 +100,7 @@ class NeutronServerFixture(fixtures.Fixture): def __init__(self, env_desc, host_desc, test_name, neutron_cfg_fixture, plugin_cfg_fixture): - super(NeutronServerFixture, self).__init__() + super().__init__() self.env_desc = env_desc self.host_desc = host_desc self.test_name = test_name @@ -140,7 +140,7 @@ class OVSAgentFixture(fixtures.Fixture): def __init__(self, env_desc, host_desc, test_name, neutron_cfg_fixture, agent_cfg_fixture): - super(OVSAgentFixture, self).__init__() + super().__init__() self.env_desc = env_desc self.host_desc = host_desc self.test_name = test_name @@ -174,7 +174,7 @@ class LinuxBridgeAgentFixture(fixtures.Fixture): def __init__(self, env_desc, host_desc, test_name, neutron_cfg_fixture, agent_cfg_fixture, namespace=None): - super(LinuxBridgeAgentFixture, self).__init__() + super().__init__() self.env_desc = env_desc self.host_desc = host_desc self.test_name = test_name @@ -206,7 +206,7 @@ class L3AgentFixture(fixtures.Fixture): def __init__(self, env_desc, host_desc, test_name, neutron_cfg_fixture, l3_agent_cfg_fixture, namespace=None): - super(L3AgentFixture, self).__init__() + super().__init__() self.env_desc = env_desc self.host_desc = host_desc self.test_name = test_name diff --git a/neutron_fwaas/tests/fullstack/test_l3_agent.py b/neutron_fwaas/tests/fullstack/test_l3_agent.py index 58d1ea02c..8296db9c3 100644 --- a/neutron_fwaas/tests/fullstack/test_l3_agent.py +++ b/neutron_fwaas/tests/fullstack/test_l3_agent.py @@ -57,7 +57,7 @@ class TestL3Agent(base.BaseFullStackTestCase): for cidr in subnet_cidrs: # For IPv6 subnets, enable_dhcp should be set to true. enable_dhcp = (netaddr.IPNetwork(cidr).version == - constants.IP_VERSION_6) + constants.IP_VERSION_6) subnet = self.safe_client.create_subnet( tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp) @@ -83,7 +83,7 @@ class TestLegacyL3Agent(TestL3Agent): environment.EnvironmentDescription( network_type='vlan', l2_pop=False), host_descriptions) - super(TestLegacyL3Agent, self).setUp(env) + super().setUp(env) def _get_namespace(self, router_id): return namespaces.build_ns_name(l3_agent.NS_PREFIX, router_id) @@ -101,9 +101,9 @@ class TestLegacyL3Agent(TestL3Agent): tenant_id, network['id'], '20.0.0.0/24', gateway_ip='20.0.0.1') self.safe_client.add_router_interface(router['id'], subnet['id']) - namespace = "%s@%s" % ( + namespace = "{}@{}".format( self._get_namespace(router['id']), - self.environment.hosts[0].l3_agent.get_namespace_suffix(), ) + self.environment.hosts[0].l3_agent.get_namespace_suffix()) self._assert_namespace_exists(namespace) def test_east_west_traffic(self): @@ -159,7 +159,7 @@ class TestHAL3Agent(base.BaseFullStackTestCase): environment.EnvironmentDescription( network_type='vxlan', l2_pop=True), host_descriptions) - super(TestHAL3Agent, self).setUp(env) + super().setUp(env) def _is_ha_router_active_on_one_agent(self, router_id): agents = self.client.list_l3_agent_hosting_routers(router_id) diff --git a/neutron_fwaas/tests/functional/db/test_migrations.py b/neutron_fwaas/tests/functional/db/test_migrations.py index c28b894c6..30acc4906 100644 --- a/neutron_fwaas/tests/functional/db/test_migrations.py +++ b/neutron_fwaas/tests/functional/db/test_migrations.py @@ -61,7 +61,7 @@ class TestSanityCheck(testlib_api.SqlTestCaseLight): BUILD_SCHEMA = False def setUp(self): - super(TestSanityCheck, self).setUp() + super().setUp() for conf in migration.get_alembic_configs(): self.alembic_config = conf diff --git a/neutron_fwaas/tests/functional/privileged/test_netlink_lib.py b/neutron_fwaas/tests/functional/privileged/test_netlink_lib.py index 5d34f8bac..d985ce9f8 100644 --- a/neutron_fwaas/tests/functional/privileged/test_netlink_lib.py +++ b/neutron_fwaas/tests/functional/privileged/test_netlink_lib.py @@ -148,7 +148,7 @@ class NetlinkLibTestCaseIPv6(functional_base.BaseSudoTestCase): ) def setUp(self): - super(NetlinkLibTestCaseIPv6, self).setUp() + super().setUp() if not check_nf_conntrack_ipv6_is_loaded(): self.skipTest( "nf_conntrack_ipv6 module wasn't loaded. Please load" diff --git a/neutron_fwaas/tests/functional/privileged/test_utils.py b/neutron_fwaas/tests/functional/privileged/test_utils.py index 3d62d4473..f548ddf72 100644 --- a/neutron_fwaas/tests/functional/privileged/test_utils.py +++ b/neutron_fwaas/tests/functional/privileged/test_utils.py @@ -24,7 +24,7 @@ from neutron_fwaas.privileged.tests.functional import utils class InNamespaceTest(base.BaseSudoTestCase): def setUp(self): - super(InNamespaceTest, self).setUp() + super().setUp() self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name ip = ip_lib.IPWrapper() diff --git a/neutron_fwaas/tests/functional/services/logapi/agents/drivers/iptables/test_log.py b/neutron_fwaas/tests/functional/services/logapi/agents/drivers/iptables/test_log.py index 099ea9ea4..1243ce86b 100644 --- a/neutron_fwaas/tests/functional/services/logapi/agents/drivers/iptables/test_log.py +++ b/neutron_fwaas/tests/functional/services/logapi/agents/drivers/iptables/test_log.py @@ -63,7 +63,7 @@ AGENT_MODE_OPTS = [ class FWLoggingTestBase(framework.L3AgentTestFramework): def setUp(self): - super(FWLoggingTestBase, self).setUp() + super().setUp() self.conf.register_opts(FWAAS_V2_LOG_OPTS, 'fwaas') self.conf.register_opts(AGENT_MODE_OPTS, group='DEFAULT') self._set_agent_mode(self.conf) @@ -123,7 +123,8 @@ class FWLoggingTestBase(framework.L3AgentTestFramework): if not v4rules_in_chain: ipt_mgr.ipv4['filter'].add_chain(chain) if action == 'REJECT': - ipt_mgr.ipv4['filter'].add_rule(chain, + ipt_mgr.ipv4['filter'].add_rule( + chain, '-j REJECT --reject-with icmp-port-unreachable') else: ipt_mgr.ipv4['filter'].add_rule(chain, '-j %s' % action) @@ -133,7 +134,8 @@ class FWLoggingTestBase(framework.L3AgentTestFramework): if not v6rules_in_chain: ipt_mgr.ipv6['filter'].add_chain(chain) if action == 'REJECT': - ipt_mgr.ipv6['filter'].add_rule(chain, + ipt_mgr.ipv6['filter'].add_rule( + chain, '-j REJECT --reject-with icmp6-port-unreachable') else: ipt_mgr.ipv6['filter'].add_rule(chain, '-j %s' % action) diff --git a/neutron_fwaas/tests/unit/cmd/upgrade_checks/test_checks.py b/neutron_fwaas/tests/unit/cmd/upgrade_checks/test_checks.py index 8de1b1ac5..660b85158 100644 --- a/neutron_fwaas/tests/unit/cmd/upgrade_checks/test_checks.py +++ b/neutron_fwaas/tests/unit/cmd/upgrade_checks/test_checks.py @@ -24,7 +24,7 @@ from neutron_fwaas.tests import base class TestChecks(base.BaseTestCase): def setUp(self): - super(TestChecks, self).setUp() + super().setUp() self.checks = checks.Checks() def test_get_checks_list(self): diff --git a/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py b/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py index b6ff1ee32..36ccf5e35 100644 --- a/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py +++ b/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py @@ -30,7 +30,7 @@ from neutron_fwaas.tests.unit.services.firewall import test_fwaas_plugin_v2 class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): def setUp(self): - super(TestFirewallDBPluginV2, self).setUp() + super().setUp() self.db = self.plugin.driver.firewall_db def test_get_policy_ordered_rules(self): @@ -877,8 +877,8 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): # make sure that admin can see default groups for admin and non-admin res = self._list_req('firewall_groups', ctx=ctx_admin, as_admin=True) self.assertEqual(2, len(res)) - self.assertEqual(set([ctx_admin.tenant_id, ctx.tenant_id]), - set([r['tenant_id'] for r in res])) + self.assertEqual({ctx_admin.tenant_id, ctx.tenant_id}, + {r['tenant_id'] for r in res}) def test_create_default_firewall_group_from_config(self): group = 'default_fwg_rules' @@ -1026,10 +1026,10 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): description = "my_firewall1" not_found_fwp_id = uuidutils.generate_uuid() self._create_firewall_group(fmt, fwg_name, - description, not_found_fwp_id, - not_found_fwp_id, ports=None, - admin_state_up=self.ADMIN_STATE_UP, - expected_res_status=404) + description, not_found_fwp_id, + not_found_fwp_id, ports=None, + admin_state_up=self.ADMIN_STATE_UP, + expected_res_status=404) def test_create_firewall_group_with_fwp_on_different_tenant(self): fmt = self.fmt @@ -1041,9 +1041,9 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): ctx = self._get_nonadmin_context() self._create_firewall_group(fmt, fwg_name, description, - ingress_firewall_policy_id=fwp_id, + ingress_firewall_policy_id=fwp_id, egress_firewall_policy_id=fwp_id, - context=ctx, + context=ctx, expected_res_status=404) def test_create_firewall_group_with_admin_and_fwp_different_tenant(self): @@ -1685,8 +1685,8 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): as_admin=True) as fwp: fwp_id = fwp['firewall_policy']['id'] fwr_id = fwr['firewall_rule']['id'] - msg = "Firewall rule {0} is not associated with " \ - "firewall policy {1}.".format(fwr_id, fwp_id) + msg = "Firewall rule {} is not associated with " \ + "firewall policy {}.".format(fwr_id, fwp_id) result = self._rule_action( 'remove', fwp_id, fwr_id, insert_before=None, diff --git a/neutron_fwaas/tests/unit/privileged/netfilter_log/test_libnetfilter_log.py b/neutron_fwaas/tests/unit/privileged/netfilter_log/test_libnetfilter_log.py index 763b83b84..4cb19cefc 100644 --- a/neutron_fwaas/tests/unit/privileged/netfilter_log/test_libnetfilter_log.py +++ b/neutron_fwaas/tests/unit/privileged/netfilter_log/test_libnetfilter_log.py @@ -35,7 +35,7 @@ class NFLogAppTestCase(base.BaseTestCase): self.nflog_app = lib_log.NFLogApp() self.spawn = mock.patch('eventlet.spawn').start() - super(NFLogAppTestCase, self).setUp() + super().setUp() def test_register_packet_handler(self): def fake_method(): @@ -55,7 +55,7 @@ class NFLogAppTestCase(base.BaseTestCase): class NFLogWrapper(base.BaseTestCase): def setUp(self): - super(NFLogWrapper, self).setUp() + super().setUp() lib_log.libnflog = mock.Mock() lib_log.ffi = mock.Mock() diff --git a/neutron_fwaas/tests/unit/privileged/test_netlink_lib.py b/neutron_fwaas/tests/unit/privileged/test_netlink_lib.py index 9e96cd4be..4f538f96b 100644 --- a/neutron_fwaas/tests/unit/privileged/test_netlink_lib.py +++ b/neutron_fwaas/tests/unit/privileged/test_netlink_lib.py @@ -40,7 +40,7 @@ FAKE_ICMPV6_ENTRY = {'ipversion': 6, 'protocol': 'ipv6-icmp', class NetlinkLibTestCase(base.BaseTestCase): def setUp(self): - super(NetlinkLibTestCase, self).setUp() + super().setUp() nl_lib.nfct = mock.Mock() nl_lib.libc = mock.Mock() @@ -250,8 +250,8 @@ class NetlinkLibTestCase(base.BaseTestCase): with nl_lib.ConntrackManager() as conntrack: nl_lib.nfct.nfct_open.assert_called_once() conntrack.delete_entries([FAKE_ENTRY, - FAKE_TCP_ENTRY, - FAKE_UDP_ENTRY]) + FAKE_TCP_ENTRY, + FAKE_UDP_ENTRY]) calls = [ mock.call(conntrack_filter, nl_constants.ATTR_L3PROTO, diff --git a/neutron_fwaas/tests/unit/privileged/test_utils.py b/neutron_fwaas/tests/unit/privileged/test_utils.py index 4b3a4dad4..5d84baf49 100644 --- a/neutron_fwaas/tests/unit/privileged/test_utils.py +++ b/neutron_fwaas/tests/unit/privileged/test_utils.py @@ -27,7 +27,7 @@ class InNamespaceTest(base.BaseTestCase): NEW_NETNS = 'newns' def setUp(self): - super(InNamespaceTest, self).setUp() + super().setUp() # NOTE(cby): we should unmock os.open/close as early as possible # because there are used in cleanups diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/noop/test_noop_driver.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/noop/test_noop_driver.py index d23ef36a9..c41575bd0 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/noop/test_noop_driver.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/noop/test_noop_driver.py @@ -23,7 +23,7 @@ from neutron_fwaas.tests import base class TestNoopDriver(base.BaseTestCase): def setUp(self): - super(TestNoopDriver, self).setUp() + super().setUp() mock_br = mock.Mock() self.firewall = noop_driver.NoopFirewallL2Driver(mock_br) diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_firewall.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_firewall.py index dad920423..e2e76c0c1 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_firewall.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_firewall.py @@ -48,15 +48,15 @@ class TestCreateRegNumbers(base.BaseTestCase): def test_both_registers_defined(self): flow = {'foo': 'bar', 'reg_port': 1, 'reg_net': 2} expected_flow = {'foo': 'bar', - 'reg{:d}'.format(fwaas_ovs_consts.REG_PORT): 1, - 'reg{:d}'.format(fwaas_ovs_consts.REG_NET): 2} + f'reg{fwaas_ovs_consts.REG_PORT:d}': 1, + f'reg{fwaas_ovs_consts.REG_NET:d}': 2} ovsfw.create_reg_numbers(flow) self.assertEqual(expected_flow, flow) class TestFirewallGroup(base.BaseTestCase): def setUp(self): - super(TestFirewallGroup, self).setUp() + super().setUp() self.fwg = ovsfw.FirewallGroup('123') self.fwg.members = {'type': [1, 2, 3, 4]} @@ -103,7 +103,7 @@ class TestFirewallGroup(base.BaseTestCase): class TestOFPort(base.BaseTestCase): def setUp(self): - super(TestOFPort, self).setUp() + super().setUp() self.ipv4_addresses = ['10.0.0.1', '192.168.0.1'] self.ipv6_addresses = ['fe80::f816:3eff:fe2e:1'] port_dict = {'device': 1, @@ -165,7 +165,7 @@ class TestOFPort(base.BaseTestCase): class TestFWGPortMap(base.BaseTestCase): def setUp(self): - super(TestFWGPortMap, self).setUp() + super().setUp() self.map = ovsfw.FWGPortMap() def test_get_or_create_fwg_existing_fwg(self): @@ -250,7 +250,7 @@ class TestFWGPortMap(base.BaseTestCase): self.map.update_members(42, []) -class FakeOVSPort(object): +class FakeOVSPort: def __init__(self, name, port, mac): self.port_name = name self.ofport = port @@ -259,7 +259,7 @@ class FakeOVSPort(object): class TestOVSFirewallDriver(base.BaseTestCase): def setUp(self): - super(TestOVSFirewallDriver, self).setUp() + super().setUp() self._mock_ovs_br = mock.patch.object( ovs_lib, 'OVSBridge', autospec=True) mock_bridge = self._mock_ovs_br.start() @@ -319,8 +319,8 @@ class TestOVSFirewallDriver(base.BaseTestCase): def test__add_flow_registers_are_replaced(self): self.firewall._add_flow(in_port=1, reg_port=1, reg_net=2) expected_calls = {'in_port': 1, - 'reg{:d}'.format(fwaas_ovs_consts.REG_PORT): 1, - 'reg{:d}'.format(fwaas_ovs_consts.REG_NET): 2} + f'reg{fwaas_ovs_consts.REG_PORT:d}': 1, + f'reg{fwaas_ovs_consts.REG_NET:d}': 2} self.mock_bridge.br.add_flow.assert_called_once_with( **expected_calls) @@ -444,7 +444,7 @@ class TestOVSFirewallDriver(base.BaseTestCase): 'output:{:d},resubmit(,{:d})'.format( self.port_ofport, ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE), - dl_type="0x{:04x}".format(constants.ETHERTYPE_IP), + dl_type=f"0x{constants.ETHERTYPE_IP:04x}", nw_proto=constants.PROTO_NUM_TCP, priority=70, reg5=self.port_ofport, @@ -482,8 +482,8 @@ class TestOVSFirewallDriver(base.BaseTestCase): priority=95, table=ovs_consts.TRANSIENT_TABLE) filter_rule = mock.call( - actions='resubmit(,{:d})'.format(ovs_consts.RULES_INGRESS_TABLE), - dl_type="0x{:04x}".format(constants.ETHERTYPE_IP), + actions=f'resubmit(,{ovs_consts.RULES_INGRESS_TABLE:d})', + dl_type=f"0x{constants.ETHERTYPE_IP:04x}", nw_proto=constants.PROTO_NUM_TCP, priority=70, reg5=self.port_ofport, @@ -529,7 +529,7 @@ class TestOVSFirewallDriver(base.BaseTestCase): mock.call( actions='resubmit(,{:d})'.format( fwaas_ovs_consts.FW_ACCEPT_OR_INGRESS_TABLE), - dl_type="0x{:04x}".format(constants.ETHERTYPE_IP), + dl_type=f"0x{constants.ETHERTYPE_IP:04x}", nw_proto=constants.PROTO_NUM_UDP, priority=71, ct_state=fwaas_ovs_consts.OF_STATE_NEW_NOT_ESTABLISHED, @@ -567,7 +567,7 @@ class TestOVSFirewallDriver(base.BaseTestCase): mock.call( actions='resubmit(,{:d})'.format( ovs_consts.RULES_EGRESS_TABLE), - dl_type="0x{:04x}".format(constants.ETHERTYPE_IP), + dl_type=f"0x{constants.ETHERTYPE_IP:04x}", nw_proto=constants.PROTO_NUM_UDP, priority=71, ct_state=fwaas_ovs_consts.OF_STATE_NEW_NOT_ESTABLISHED, @@ -638,7 +638,7 @@ class TestOVSFirewallDriver(base.BaseTestCase): self._prepare_firewall_group() self.firewall.fwg_to_delete = {1} with mock.patch.object(self.firewall.fwg_port_map, - 'delete_fwg') as delete_fwg_mock: + 'delete_fwg') as delete_fwg_mock: self.firewall._cleanup_stale_fwg() delete_fwg_mock.assert_called_once_with(1) diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_rules.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_rules.py index 968d45965..f5d5337ca 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_rules.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/l2/openvswitch_firewall/test_rules.py @@ -52,7 +52,7 @@ class TestIsValidPrefix(base.BaseTestCase): class TestCreateFlowsFromRuleAndPort(base.BaseTestCase): def setUp(self): - super(TestCreateFlowsFromRuleAndPort, self).setUp() + super().setUp() ovs_port = mock.Mock(vif_mac='00:00:00:00:00:00') ovs_port.ofport = 1 port_dict = {'device': 'port_id'} @@ -169,7 +169,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase): class TestCreateProtocolFlows(base.BaseTestCase): def setUp(self): - super(TestCreateProtocolFlows, self).setUp() + super().setUp() ovs_port = mock.Mock(vif_mac='00:00:00:00:00:00') ovs_port.ofport = 1 port_dict = {'device': 'port_id'} diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_iptables_fwaas_v2.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_iptables_fwaas_v2.py index 9f62e95b4..0fcc8be9d 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_iptables_fwaas_v2.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_iptables_fwaas_v2.py @@ -38,7 +38,7 @@ MAX_INTF_NAME_LEN = 14 class IptablesFwaasTestCase(base.BaseTestCase): def setUp(self): - super(IptablesFwaasTestCase, self).setUp() + super().setUp() self.iptables_cls_p = mock.patch( 'neutron.agent.linux.iptables_manager.IptablesManager') self.iptables_cls_p.start() @@ -124,7 +124,7 @@ class IptablesFwaasTestCase(base.BaseTestCase): return fw_inst def _fake_apply_list(self, router_count=1, distributed=False, - distributed_mode=None): + distributed_mode=None): apply_list = [] while router_count > 0: iptables_inst = mock.Mock() @@ -149,12 +149,13 @@ class IptablesFwaasTestCase(base.BaseTestCase): return apply_list def _get_intf_name(self, if_prefix, port_id): - _name = "%s%s" % (if_prefix, port_id) + _name = "{}{}".format(if_prefix, port_id) return _name[:MAX_INTF_NAME_LEN] def _setup_firewall_with_rules(self, func, router_count=1, - distributed=False, distributed_mode=None): - apply_list = self._fake_apply_list(router_count=router_count, + distributed=False, distributed_mode=None): + apply_list = self._fake_apply_list( + router_count=router_count, distributed=distributed, distributed_mode=distributed_mode) rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list) firewall = self._fake_firewall(rule_list) @@ -178,8 +179,8 @@ class IptablesFwaasTestCase(base.BaseTestCase): rule3 = '-p tcp -m tcp --dport 23 -j %s' % rejected ingress_chain = 'iv4%s' % firewall['id'] egress_chain = 'ov4%s' % firewall['id'] - ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11]) - ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11]) + ipt_mgr_ichain = '{}-{}'.format(binary_name, ingress_chain[:11]) + ipt_mgr_echain = '{}-{}'.format(binary_name, egress_chain[:11]) for router_info_inst, port_ids in apply_list: v4filter_inst = router_info_inst.iptables_manager.ipv4['filter'] calls = [mock.call.remove_chain('iv4fake-fw-uuid'), @@ -204,24 +205,29 @@ class IptablesFwaasTestCase(base.BaseTestCase): for port in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port) - calls.append(mock.call.add_rule('FORWARD', - '-o %s -j %s' % (intf_name, ipt_mgr_ichain))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-o {} -j {}'.format(intf_name, ipt_mgr_ichain))) for port in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port) - calls.append(mock.call.add_rule('FORWARD', - '-i %s -j %s' % (intf_name, ipt_mgr_echain))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-i {} -j {}'.format(intf_name, ipt_mgr_echain))) for direction in ['o', 'i']: for port_id in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port_id) - calls.append(mock.call.add_rule('FORWARD', - '-%s %s -j %s-fwaas-defau' % (direction, - intf_name, binary_name))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-{} {} -j {}-fwaas-defau'.format( + direction, intf_name, binary_name))) v4filter_inst.assert_has_calls(calls) - def _setup_firewall_with_rules_v6(self, func, router_count=1, + def _setup_firewall_with_rules_v6( + self, func, router_count=1, distributed=False, distributed_mode=None): - apply_list = self._fake_apply_list(router_count=router_count, + apply_list = self._fake_apply_list( + router_count=router_count, distributed=distributed, distributed_mode=distributed_mode) rule_list = self._fake_rules_v6(FAKE_FW_ID, apply_list) firewall = self._fake_firewall(rule_list) @@ -242,8 +248,8 @@ class IptablesFwaasTestCase(base.BaseTestCase): rule1 = '-p ipv6-icmp -d 2001:db8::2/128 -j %s' % accepted ingress_chain = 'iv6%s' % firewall['id'] egress_chain = 'ov6%s' % firewall['id'] - ipt_mgr_ichain = '%s-%s' % (binary_name, ingress_chain[:11]) - ipt_mgr_echain = '%s-%s' % (binary_name, egress_chain[:11]) + ipt_mgr_ichain = '{}-{}'.format(binary_name, ingress_chain[:11]) + ipt_mgr_echain = '{}-{}'.format(binary_name, egress_chain[:11]) for router_info_inst, port_ids in apply_list: v6filter_inst = router_info_inst.iptables_manager.ipv6['filter'] calls = [mock.call.remove_chain('iv6fake-fw-uuid'), @@ -263,19 +269,23 @@ class IptablesFwaasTestCase(base.BaseTestCase): ] for port in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port) - calls.append(mock.call.add_rule('FORWARD', - '-o %s -j %s' % (intf_name, ipt_mgr_ichain))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-o {} -j {}'.format(intf_name, ipt_mgr_ichain))) for port in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port) - calls.append(mock.call.add_rule('FORWARD', - '-i %s -j %s' % (intf_name, ipt_mgr_echain))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-i {} -j {}'.format(intf_name, ipt_mgr_echain))) for direction in ['o', 'i']: for port_id in FAKE_PORT_IDS: intf_name = self._get_intf_name(if_prefix, port_id) - calls.append(mock.call.add_rule('FORWARD', - '-%s %s -j %s-fwaas-defau' % (direction, - intf_name, binary_name))) + calls.append(mock.call.add_rule( + 'FORWARD', + '-{} {} -j {}-fwaas-defau'.format(direction, + intf_name, + binary_name))) v6filter_inst.assert_has_calls(calls) @@ -289,8 +299,8 @@ class IptablesFwaasTestCase(base.BaseTestCase): invalid_rule = '-m state --state INVALID -j %s' % dropped est_rule = '-m state --state RELATED,ESTABLISHED -j ACCEPT' for ip_version in (4, 6): - ingress_chain = ('iv%s%s' % (ip_version, firewall['id'])) - egress_chain = ('ov%s%s' % (ip_version, firewall['id'])) + ingress_chain = ('iv{}{}'.format(ip_version, firewall['id'])) + egress_chain = ('ov{}{}'.format(ip_version, firewall['id'])) calls = [mock.call.remove_chain( 'iv%sfake-fw-uuid' % ip_version), mock.call.remove_chain( @@ -308,10 +318,11 @@ class IptablesFwaasTestCase(base.BaseTestCase): for port_id in FAKE_PORT_IDS: for direction in ['o', 'i']: - mock.call.add_rule('FORWARD', - '-%s qr-%s -j %s-fwaas-defau' % (port_id, - direction, - binary_name)) + mock.call.add_rule( + 'FORWARD', + '-{} qr-{} -j {}-fwaas-defau'.format(port_id, + direction, + binary_name)) if ip_version == 4: v4filter_inst = first_ri.iptables_manager.ipv4['filter'] v4filter_inst.assert_has_calls(calls) @@ -377,19 +388,23 @@ class IptablesFwaasTestCase(base.BaseTestCase): first_ri.iptables_manager.ipv4['filter'].assert_has_calls(calls) def test_create_firewall_group_with_rules_dvr_snat(self): - self._setup_firewall_with_rules(self.firewall.create_firewall_group, + self._setup_firewall_with_rules( + self.firewall.create_firewall_group, distributed=True, distributed_mode='dvr_snat') def test_update_firewall_group_with_rules_dvr_snat(self): - self._setup_firewall_with_rules(self.firewall.update_firewall_group, + self._setup_firewall_with_rules( + self.firewall.update_firewall_group, distributed=True, distributed_mode='dvr_snat') def test_create_firewall_group_with_rules_dvr(self): - self._setup_firewall_with_rules(self.firewall.create_firewall_group, + self._setup_firewall_with_rules( + self.firewall.create_firewall_group, distributed=True, distributed_mode='dvr') def test_update_firewall_group_with_rules_dvr(self): - self._setup_firewall_with_rules(self.firewall.update_firewall_group, + self._setup_firewall_with_rules( + self.firewall.update_firewall_group, distributed=True, distributed_mode='dvr') def test_remove_conntrack_new_firewall(self): @@ -408,10 +423,10 @@ class IptablesFwaasTestCase(base.BaseTestCase): self.firewall.create_firewall_group(FW_LEGACY, apply_list, firewall) self.firewall.pre_firewall = dict(firewall) insert_rule = {'enabled': True, - 'action': 'deny', - 'ip_version': 4, - 'protocol': 'icmp', - 'id': 'fake-fw-rule'} + 'action': 'deny', + 'ip_version': 4, + 'protocol': 'icmp', + 'id': 'fake-fw-rule'} rule_list.insert(2, insert_rule) firewall = self._fake_firewall(rule_list) self.firewall.update_firewall_group(FW_LEGACY, apply_list, firewall) @@ -492,10 +507,10 @@ class IptablesFwaasTestCase(base.BaseTestCase): firewall = self._fake_firewall(rule_list) self.firewall.create_firewall_group(FW_LEGACY, apply_list, firewall) income_rule = {'enabled': True, - 'action': 'deny', - 'ip_version': 4, - 'protocol': 'tcp', - 'id': 'fake-fw-rule3'} + 'action': 'deny', + 'ip_version': 4, + 'protocol': 'tcp', + 'id': 'fake-fw-rule3'} rule_list[2] = income_rule firewall = self._fake_firewall(rule_list) self.firewall.update_firewall_group(FW_LEGACY, apply_list, firewall) diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_legacy_conntrack.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_legacy_conntrack.py index 947443a71..fa2bbb5a0 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_legacy_conntrack.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_legacy_conntrack.py @@ -96,7 +96,7 @@ ROUTER_NAMESPACE = 'qrouter-fake-namespace' class ConntrackLegacyTestCase(base.BaseTestCase): def setUp(self): - super(ConntrackLegacyTestCase, self).setUp() + super().setUp() self.utils_exec = mock.Mock() self.conntrack_driver = legacy_conntrack.ConntrackLegacy() self.conntrack_driver.initialize(execute=self.utils_exec) @@ -135,7 +135,7 @@ class ConntrackLegacyTestCase(base.BaseTestCase): self.conntrack_driver._execute_command = mock.Mock( side_effect=get_contrack_entries) entries = self.conntrack_driver.list_entries(ROUTER_NAMESPACE) - protocols = set([entry[1] for entry in entries]) + protocols = {entry[1] for entry in entries} supported_protocols = set(legacy_conntrack.ATTR_POSITIONS.keys()) self.assertTrue(protocols.issubset(supported_protocols)) diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_netlink_conntrack.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_netlink_conntrack.py index 6265298ce..a0a934f9e 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_netlink_conntrack.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/drivers/linux/test_netlink_conntrack.py @@ -69,7 +69,7 @@ ROUTER_NAMESPACE = 'qrouter-fake-namespace' class ConntrackNetlinkTestCase(base.BaseTestCase): def setUp(self): - super(ConntrackNetlinkTestCase, self).setUp() + super().setUp() self.conntrack_driver = netlink_conntrack.ConntrackNetlink() self.conntrack_driver.initialize() nl_flush_entries = mock.patch('neutron_fwaas.privileged.' @@ -79,7 +79,7 @@ class ConntrackNetlinkTestCase(base.BaseTestCase): 'netlink_lib.list_entries') self.list_entries = nl_list_entries.start() nl_delete_entries = mock.patch('neutron_fwaas.privileged.' - 'netlink_lib.delete_entries') + 'netlink_lib.delete_entries') self.delete_entries = nl_delete_entries.start() def test_flush_entries(self): diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/fake_data.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/fake_data.py index de0dfa39b..2807fd5aa 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/fake_data.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/fake_data.py @@ -34,10 +34,10 @@ PORT4 = uuidutils.generate_uuid() HOST = 'fake_host' -class FakeFWaaSL2Agent(object): +class FakeFWaaSL2Agent: def __init__(self): - super(FakeFWaaSL2Agent, self).__init__() + super().__init__() def create(self, resource, attrs=None, minimal=False): """Create a fake fwaas v2 resources diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/test_fwaas_v2.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/test_fwaas_v2.py index 5d9cfc22e..d124331b9 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/test_fwaas_v2.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l2/test_fwaas_v2.py @@ -31,7 +31,7 @@ from neutron_fwaas.tests.unit.services.firewall.service_drivers.agents.l2\ class TestFWaasV2AgentExtensionBase(base.BaseTestCase): def setUp(self): - super(TestFWaasV2AgentExtensionBase, self).setUp() + super().setUp() self.fake = fake_data.FakeFWaaSL2Agent() self.port = self.fake.create('port') @@ -60,7 +60,7 @@ class TestFWaasV2AgentExtensionBase(base.BaseTestCase): class TestFWaasV2AgentExtension(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestFWaasV2AgentExtension, self).setUp() + super().setUp() cfg.CONF.set_override('firewall_l2_driver', 'ovs', group='fwaas') def test_initialize(self): @@ -77,7 +77,7 @@ class TestFWaasV2AgentExtension(TestFWaasV2AgentExtensionBase): class TestHandlePort(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestHandlePort, self).setUp() + super().setUp() self.rpc.get_firewall_group_for_port = mock.Mock( return_value=self.fwg) self.l2._compute_status = mock.Mock(return_value=nl_consts.ACTIVE) @@ -154,7 +154,7 @@ class TestHandlePort(TestFWaasV2AgentExtensionBase): class TestDeletePort(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestDeletePort, self).setUp() + super().setUp() self.l2._compute_status = mock.Mock(return_value=nl_consts.ACTIVE) self.l2._apply_fwg_rules = mock.Mock(return_value=True) self.l2._send_fwg_status = mock.Mock() @@ -222,7 +222,7 @@ class TestDeletePort(TestFWaasV2AgentExtensionBase): class TestCreateFirewallGroup(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestCreateFirewallGroup, self).setUp() + super().setUp() self.l2._apply_fwg_rules = mock.Mock(return_value=True) self.l2._compute_status = mock.Mock(return_value='ACTIVE') self.l2._send_fwg_status = mock.Mock() @@ -304,7 +304,7 @@ class TestCreateFirewallGroup(TestFWaasV2AgentExtensionBase): class TestDeleteFirewallGroup(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestDeleteFirewallGroup, self).setUp() + super().setUp() self.l2._apply_fwg_rules = mock.Mock(return_value=True) self.l2._compute_status = mock.Mock(return_value='ACTIVE') self.l2._send_fwg_status = mock.Mock() @@ -360,7 +360,7 @@ class TestDeleteFirewallGroup(TestFWaasV2AgentExtensionBase): class TestUpdateFirewallGroup(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestUpdateFirewallGroup, self).setUp() + super().setUp() self.l2._delete_firewall_group = mock.Mock() self.l2._create_firewall_group = mock.Mock() self.l2._send_fwg_status = mock.Mock() @@ -394,20 +394,20 @@ class TestUpdateFirewallGroup(TestFWaasV2AgentExtensionBase): class TestIsPortLayer2(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestIsPortLayer2, self).setUp() + super().setUp() def test_vm_port(self): self.assertTrue(self.l2._is_port_layer2(self.port)) def test_not_vm_port(self): for device_owner in [nl_consts.DEVICE_OWNER_ROUTER_INTF, - nl_consts.DEVICE_OWNER_ROUTER_GW, - nl_consts.DEVICE_OWNER_DHCP, - nl_consts.DEVICE_OWNER_DVR_INTERFACE, - nl_consts.DEVICE_OWNER_AGENT_GW, - nl_consts.DEVICE_OWNER_ROUTER_SNAT, - 'unknown device_owner', - '']: + nl_consts.DEVICE_OWNER_ROUTER_GW, + nl_consts.DEVICE_OWNER_DHCP, + nl_consts.DEVICE_OWNER_DVR_INTERFACE, + nl_consts.DEVICE_OWNER_AGENT_GW, + nl_consts.DEVICE_OWNER_ROUTER_SNAT, + 'unknown device_owner', + '']: self.port['device_owner'] = device_owner self.assertFalse(self.l2._is_port_layer2(self.port)) @@ -419,7 +419,7 @@ class TestIsPortLayer2(TestFWaasV2AgentExtensionBase): class TestComputeStatus(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestComputeStatus, self).setUp() + super().setUp() self.ports = list(self.fwg_with_rule['port_details'].values()) def test_normal(self): @@ -530,9 +530,9 @@ class TestComputeStatus(TestFWaasV2AgentExtensionBase): class TestApplyFwgRules(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestApplyFwgRules, self).setUp() + super().setUp() - class DummyVlan(object): + class DummyVlan: def __init__(self, vlan=None): self.vlan = vlan @@ -591,7 +591,7 @@ class TestApplyFwgRules(TestFWaasV2AgentExtensionBase): class TestSendFwgStatus(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestSendFwgStatus, self).setUp() + super().setUp() self.rpc.set_firewall_group_status = mock.Mock() def test_success(self): @@ -607,9 +607,9 @@ class TestSendFwgStatus(TestFWaasV2AgentExtensionBase): class TestAddLocalVlanToPorts(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestAddLocalVlanToPorts, self).setUp() + super().setUp() - class DummyVlan(object): + class DummyVlan: def __init__(self, vlan=None): self.vlan = vlan @@ -671,7 +671,7 @@ class TestAddLocalVlanToPorts(TestFWaasV2AgentExtensionBase): class TestFWaaSL2PluginApi(TestFWaasV2AgentExtensionBase): def setUp(self): - super(TestFWaaSL2PluginApi, self).setUp() + super().setUp() self.plugin = fwaas_v2.FWaaSL2PluginApi( consts.FIREWALL_PLUGIN, self.host) @@ -710,7 +710,7 @@ class TestFWaaSL2PluginApi(TestFWaasV2AgentExtensionBase): class TestPortFirewallGroupMap(base.BaseTestCase): def setUp(self): - super(TestPortFirewallGroupMap, self).setUp() + super().setUp() self.fake = fake_data.FakeFWaaSL2Agent() self.map = fwaas_v2.PortFirewallGroupMap() self.fwg = self.fake.create('fwg') diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l3reference/test_firewall_l3_agent_v2.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l3reference/test_firewall_l3_agent_v2.py index 9582742da..358be24fe 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l3reference/test_firewall_l3_agent_v2.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/l3reference/test_firewall_l3_agent_v2.py @@ -35,7 +35,7 @@ from neutron_fwaas.tests.unit.services.firewall.service_drivers.agents \ import test_firewall_agent_api -class FWaasHelper(object): +class FWaasHelper: def __init__(self): pass @@ -62,7 +62,7 @@ def _setup_test_agent_class(service_plugins): self.event_observers = mock.Mock() self.conf = conf firewall_agent_api._check_required_agent_extension = mock.Mock() - super(FWaasTestAgent, self).__init__(conf) + super().__init__(conf) def delete_router(self, context, data): pass @@ -72,7 +72,7 @@ def _setup_test_agent_class(service_plugins): class TestFWaaSL3AgentExtension(base.BaseTestCase): def setUp(self): - super(TestFWaaSL3AgentExtension, self).setUp() + super().setUp() self.conf = cfg.ConfigOpts() self.conf.register_opts(l3_config.OPTS) @@ -129,10 +129,10 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_create_firewall_group.return_value = True self.api.create_firewall_group(self.context, firewall_group, - host='host') + host='host') - mock_get_firewall_group_ports.assert_called_once_with(self.context, - firewall_group) + mock_get_firewall_group_ports.assert_called_once_with( + self.context, firewall_group) mock_get_in_ns_ports.assert_called assert mock_get_in_ns_ports mock_set_firewall_group_status.assert_called_once_with( @@ -145,7 +145,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): 'add-port-ids': [1, 2], 'del-port-ids': [3, 4], 'router_ids': [], - 'last-port': False} + 'last-port': False} self.api.plugin_rpc = mock.Mock() with mock.patch.object(self.api, '_get_firewall_group_ports' @@ -170,10 +170,10 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock.call(self.context, firewall_group)] self.api.update_firewall_group(self.context, firewall_group, - host='host') + host='host') self.assertEqual(mock_get_firewall_group_ports.call_args_list, - calls) + calls) mock_get_in_ns_ports.assert_called mock_set_firewall_group_status.assert_called_once_with( self.context, firewall_group['id'], 'ACTIVE') @@ -202,7 +202,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_update_firewall_group.return_value = True self.api.update_firewall_group(self.context, firewall_group, - host='host') + host='host') mock_get_firewall_group_ports.assert_called mock_get_in_ns_ports.assert_called @@ -232,7 +232,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_delete_firewall_group.return_value = True self.api.update_firewall_group(self.context, firewall_group, - host='host') + host='host') calls = [ mock.call._get_firewall_group_ports( self.context, firewall_group, require_new_plugin=True, @@ -268,7 +268,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_update_firewall_group.return_value = True self.api.update_firewall_group(self.context, firewall_group, - host='host') + host='host') calls = [ mock.call._get_firewall_group_ports( self.context, firewall_group, require_new_plugin=True, @@ -304,7 +304,7 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_update_firewall_group.return_value = True self.api.update_firewall_group(self.context, firewall_group, - host='host') + host='host') calls = [ mock.call._get_firewall_group_ports( self.context, firewall_group, require_new_plugin=True, @@ -413,13 +413,13 @@ class TestFWaaSL3AgentExtension(base.BaseTestCase): mock_driver_delete_firewall_group.return_value = True self.api.delete_firewall_group(self.context, firewall_group, - host='host') + host='host') mock_get_firewall_group_ports.assert_called_once_with( self.context, firewall_group, to_delete=True) mock_get_in_ns_ports.assert_called - mock_firewall_group_deleted.assert_called_once_with(self.context, - firewall_group['id']) + mock_firewall_group_deleted.assert_called_once_with( + self.context, firewall_group['id']) def _prepare_router_data(self): return router_info.RouterInfo(self.api, diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py index 258305736..c23379240 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py @@ -60,7 +60,7 @@ class FakeAgentApi(agents.FirewallAgentCallbacks): class TestFirewallAgentApi(base.BaseTestCase): def setUp(self): - super(TestFirewallAgentApi, self).setUp() + super().setUp() self.api = agents.FirewallAgentApi('topic', 'host') @@ -109,7 +109,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, 'TestL3NatServicePlugin') l3_plugin = {'l3_plugin_name': l3_plugin_str} common_conf.register_core_common_config_opts(cfg=cfg.CONF) - super(TestAgentDriver, self).setUp( + super().setUp( service_provider=FIREWALL_AGENT_PLUGIN_KLASS, extra_service_plugins=l3_plugin, extra_extension_paths=neutron_extensions.__path__) @@ -132,7 +132,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, def _get_test_firewall_group_attrs(self, name, status=nl_constants.INACTIVE): - return super(TestAgentDriver, self)._get_test_firewall_group_attrs( + return super()._get_test_firewall_group_attrs( name, status=status) def test_set_firewall_group_status(self): @@ -144,13 +144,13 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, admin_state_up=self.ADMIN_STATE_UP ) as fwg: fwg_id = fwg['firewall_group']['id'] - res = self.callbacks.set_firewall_group_status(ctx, fwg_id, - nl_constants.ACTIVE) + res = self.callbacks.set_firewall_group_status( + ctx, fwg_id, nl_constants.ACTIVE) fwg_db = self.plugin.get_firewall_group(ctx, fwg_id) self.assertEqual(nl_constants.ACTIVE, fwg_db['status']) self.assertTrue(res) - res = self.callbacks.set_firewall_group_status(ctx, fwg_id, - nl_constants.ERROR) + res = self.callbacks.set_firewall_group_status( + ctx, fwg_id, nl_constants.ERROR) fwg_db = self.plugin.get_firewall_group(ctx, fwg_id) self.assertEqual(nl_constants.ERROR, fwg_db['status']) self.assertFalse(res) @@ -247,12 +247,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) def test_create_firewall_group_with_ports(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2: @@ -278,12 +278,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) def test_create_firewall_group_with_ports_on_diff_routers(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2: body = self._router_interface_action( @@ -300,7 +300,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, port_id2 = body['port_id'] with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r2, \ + tenant_id=self._tenant_id) as r2, \ self.subnet() as s3: body = self._router_interface_action( @@ -320,12 +320,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) def test_create_firewall_group_with_ports_no_policy(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2: @@ -349,12 +349,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) def test_update_firewall_group_with_new_ports_no_policy(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2, \ self.subnet(cidr='30.0.0.0/24') as s3: @@ -387,7 +387,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_group': {'ports': [port_id2, port_id3]}} req = self.new_update_request('firewall_groups', data, fwg1['firewall_group']['id'], @@ -404,7 +404,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, def test_update_firewall_group_with_new_ports_status_pending(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2, \ self.subnet(cidr='30.0.0.0/24') as s3: @@ -439,7 +439,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_group': {'ports': [port_id2, port_id3]}} req = self.new_update_request('firewall_groups', data, fwg1['firewall_group']['id']) @@ -449,7 +449,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, def test_update_firewall_group_with_new_ports_status_active(self): """neutron firewall_group create test-policy """ with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1, \ self.subnet(cidr='20.0.0.0/24') as s2, \ self.subnet(cidr='30.0.0.0/24') as s3: @@ -484,10 +484,11 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) ctx = context.get_admin_context() - self.callbacks.set_firewall_group_status(ctx, + self.callbacks.set_firewall_group_status( + ctx, fwg1['firewall_group']['id'], nl_constants.ACTIVE) data = {'firewall_group': {'ports': [port_id2, port_id3]}} req = self.new_update_request('firewall_groups', data, @@ -502,7 +503,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, name = "new_firewall_rule1" attrs = self._get_test_firewall_rule_attrs(name) with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1: body = self._router_interface_action( @@ -522,14 +523,16 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, ports=[port_id1], admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) ctx = context.get_admin_context() - self.callbacks.set_firewall_group_status(ctx, + self.callbacks.set_firewall_group_status( + ctx, fwg1['firewall_group']['id'], nl_constants.ACTIVE) data = {'firewall_rule': {'name': name}} - req = self.new_update_request('firewall_rules', data, - fwr['firewall_rule']['id']) + req = self.new_update_request( + 'firewall_rules', + data, fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) for k, v in attrs.items(): @@ -539,7 +542,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, """update should fail""" name = "new_firewall_rule1" with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id) as r, \ + tenant_id=self._tenant_id) as r, \ self.subnet() as s1: body = self._router_interface_action( @@ -559,11 +562,12 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, egress_firewall_policy_id=fwp_id, ports=[port_id1], admin_state_up=True) as fwg1: self.assertEqual(nl_constants.PENDING_CREATE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule': {'name': name}} - req = self.new_update_request('firewall_rules', data, - fwr['firewall_rule']['id']) + req = self.new_update_request( + 'firewall_rules', + data, fwr['firewall_rule']['id']) res = req.get_response(self.ext_api) self.assertEqual(409, res.status_int) @@ -627,7 +631,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, ports=fwg_ports, admin_state_up=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) fwp_id = fwp["firewall_policy"]["id"] data = {'firewall_group': {'ports': fwg_ports}} diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_agent_api.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_agent_api.py index 2b27f7182..56c7fa862 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_agent_api.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_agent_api.py @@ -66,7 +66,7 @@ class NoopFwaasDriverV2(fwaas_base_v2.FwaasDriverBase): class TestFWaaSAgentApi(base.BaseTestCase): def setUp(self): - super(TestFWaaSAgentApi, self).setUp() + super().setUp() self.api = api.FWaaSPluginApiMixin( 'topic', diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_service.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_service.py index 950cd4ed1..8ba365c6e 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_service.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_firewall_service.py @@ -27,7 +27,7 @@ FWAAS_NOP_DEVICE = ('neutron_fwaas.tests.unit.services.firewall.' class TestFirewallDeviceDriverLoading(base.BaseTestCase): def setUp(self): - super(TestFirewallDeviceDriverLoading, self).setUp() + super().setUp() self.service = firewall_service.FirewallService() def test_loading_firewall_device_driver(self): diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/ovn/test_firewall_l3_driver.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/ovn/test_firewall_l3_driver.py index 57d045c3c..657b45953 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/ovn/test_firewall_l3_driver.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/ovn/test_firewall_l3_driver.py @@ -38,7 +38,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, l3_plugin_str = ('neutron.tests.unit.extensions.test_l3.' 'TestL3NatServicePlugin') l3_plugin = {'l3_plugin_name': l3_plugin_str} - super(TestOVNFwaasDriver, self).setUp( + super().setUp( service_provider=OVN_FWAAS_DRIVER, extra_service_plugins=l3_plugin, extra_extension_paths=neutron_extensions.__path__) @@ -68,11 +68,11 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, admin_state_up=True, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) def test_create_firewall_group_with_ports(self): with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id, as_admin=True) as r, \ + tenant_id=self._tenant_id, as_admin=True) as r, \ self.subnet(as_admin=True) as s1, \ self.subnet(cidr='20.0.0.0/24', as_admin=True) as s2: body = self._router_interface_action( @@ -108,7 +108,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, def test_update_firewall_group_with_new_ports(self): with self.router(name='router1', admin_state_up=True, - tenant_id=self._tenant_id, as_admin=True) as r, \ + tenant_id=self._tenant_id, as_admin=True) as r, \ self.subnet(as_admin=True) as s1, \ self.subnet(cidr='20.0.0.0/24', as_admin=True) as s2, \ self.subnet(cidr='30.0.0.0/24', as_admin=True) as s3: @@ -149,7 +149,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, ports=fwg_ports, admin_state_up=True, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.ACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_group': {'ports': [port_id2, port_id3]}} req = self.new_update_request('firewall_groups', data, fwg1['firewall_group']['id'], @@ -202,7 +202,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.ACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) fwp_id = fwp["firewall_policy"]["id"] data = {'firewall_group': {'ports': fwg_ports}} @@ -254,7 +254,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) new_rules = [fwr_id, fwr2_id, fwr3_id] data = {'firewall_policy': {'firewall_rules': @@ -290,7 +290,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule': {'enabled': False}} req = (self. @@ -325,7 +325,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule': {'enabled': True}} req = (self. @@ -361,7 +361,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule': {'action': 'deny'}} req = (self. @@ -398,7 +398,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule_id': fwr2_id, 'insert_after': fwr_id} @@ -436,7 +436,7 @@ class TestOVNFwaasDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, do_delete=False, as_admin=True) as fwg1: self.assertEqual(nl_constants.INACTIVE, - fwg1['firewall_group']['status']) + fwg1['firewall_group']['status']) data = {'firewall_rule_id': fwr2_id} req = (self. diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/test_driver_api.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/test_driver_api.py index d1b16a3c2..cd0122154 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/test_driver_api.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/test_driver_api.py @@ -30,7 +30,7 @@ class FireWallDriverDBMixinTestCase(test_fwaas_plugin_v2. def setUp(self): provider = ('neutron_fwaas.services.firewall.service_drivers.' 'driver_api.FirewallDriverDB') - super(FireWallDriverDBMixinTestCase, self).setUp( + super().setUp( service_provider=provider) self._mp_registry_publish = mock.patch( 'neutron_lib.callbacks.registry.publish') diff --git a/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py b/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py index 30e9f4434..fa02a3cbe 100644 --- a/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py +++ b/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py @@ -37,8 +37,8 @@ from neutron_fwaas.services.firewall.service_drivers.driver_api import \ def http_client_error(req, res): - explanation = "Request '%s %s %s' failed: %s" % (req.method, req.url, - req.body, res.body) + explanation = "Request '{} {} {}' failed: {}".format(req.method, req.url, + req.body, res.body) return webob.exc.HTTPClientError(code=res.status_int, explanation=explanation) @@ -65,10 +65,10 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase): ADMIN_STATE_UP = True SHARED = True - resource_prefix_map = dict( - (k, firewall_v2.API_PREFIX) + resource_prefix_map = { + k: firewall_v2.API_PREFIX for k in firewall_v2.RESOURCE_ATTRIBUTE_MAP.keys() - ) + } def setUp(self, service_provider=None, core_plugin=None, extra_service_plugins=None, extra_extension_paths=None): @@ -109,7 +109,7 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase): plugins, ) - super(FirewallPluginV2TestCase, self).setUp( + super().setUp( plugin=core_plugin, service_plugins=service_plugins, ext_mgr=ext_mgr, @@ -339,11 +339,12 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase): firewall_policy = self.deserialize(fmt or self.fmt, res) fwp_id = firewall_policy["firewall_policy"]["id"] ingress_firewall_policy_id = fwp_id - data = {'firewall_group': {'name': name, - 'description': description, - 'ingress_firewall_policy_id': ingress_firewall_policy_id, - 'egress_firewall_policy_id': egress_firewall_policy_id, - 'admin_state_up': admin_state_up}} + data = {'firewall_group': { + 'name': name, + 'description': description, + 'ingress_firewall_policy_id': ingress_firewall_policy_id, + 'egress_firewall_policy_id': egress_firewall_policy_id, + 'admin_state_up': admin_state_up}} ctx = kwargs.get('context', None) if ctx is None or ctx.is_admin: tenant_id = kwargs.get('tenant_id', self._tenant_id) @@ -663,7 +664,7 @@ class TestAutomaticAssociation(TestFirewallPluginBasev2): def setUp(self): # TODO(yushiro): Replace constant value for this test class # Set auto association fwg - super(TestAutomaticAssociation, self).setUp() + super().setUp() def test_vm_port(self): port = { diff --git a/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_driver.py b/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_driver.py index ccb07564a..7b4b55a95 100644 --- a/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_driver.py +++ b/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_driver.py @@ -35,7 +35,7 @@ class FakeDriver(log_base_driver.DriverBase): class TestDriverBase(base.BaseTestCase): def setUp(self): - super(TestDriverBase, self).setUp() + super().setUp() self.driver = FakeDriver.create() def test_is_vif_type_compatible(self): diff --git a/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_log.py b/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_log.py index afb54ad83..85d895785 100644 --- a/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_log.py +++ b/neutron_fwaas/tests/unit/services/logapi/agents/drivers/iptables/test_log.py @@ -37,15 +37,15 @@ FAKE_BURST = 25 class TestLogPrefix(base.BaseTestCase): def setUp(self): - super(TestLogPrefix, self).setUp() + super().setUp() self.log_prefix = log.LogPrefix(FAKE_PORT_ID, 'fake_event', FAKE_PROJECT_ID) - self.log_prefix.log_object_refs = set([FAKE_LOG_ID]) + self.log_prefix.log_object_refs = {FAKE_LOG_ID} def test_add_log_obj_ref(self): added_log_id = test_base._uuid - expected_log_obj_ref = set([FAKE_LOG_ID, added_log_id]) + expected_log_obj_ref = {FAKE_LOG_ID, added_log_id} self.log_prefix.add_log_obj_ref(added_log_id) self.assertEqual(expected_log_obj_ref, self.log_prefix.log_object_refs) @@ -63,7 +63,7 @@ class TestLogPrefix(base.BaseTestCase): class BaseIptablesLogTestCase(base.BaseTestCase): def setUp(self): - super(BaseIptablesLogTestCase, self).setUp() + super().setUp() self.iptables_manager_patch = mock.patch( 'neutron.agent.linux.iptables_manager.IptablesManager') self.iptables_manager_mock = self.iptables_manager_patch.start() @@ -135,13 +135,13 @@ class BaseIptablesLogTestCase(base.BaseTestCase): self.log_driver.ipt_mgr_list = self._fake_ipt_mgr_list(f_router_ids) # Test with a port is delete from router - self.log_driver.unused_port_ids = set(['r1_port1']) + self.log_driver.unused_port_ids = {'r1_port1'} self.log_driver._cleanup_unused_ipt_mgrs() self.assertEqual(set(), self.log_driver.unused_port_ids) self.assertIsNone(self.log_driver.ipt_mgr_list['r1'].get('r1_port1')) # Test with all ports are deleted from router - self.log_driver.unused_port_ids = set(['r2_port1', 'r2_port2']) + self.log_driver.unused_port_ids = {'r2_port1', 'r2_port2'} self.log_driver._cleanup_unused_ipt_mgrs() self.assertEqual(set(), self.log_driver.unused_port_ids) self.assertIsNone(self.log_driver.ipt_mgr_list.get('r2')) @@ -245,7 +245,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase): # Test with prefix already added into prefixes_table self.log_driver._add_nflog_rules_accepted(ipt_mgr, f_port_log) self.log_driver._add_rules_to_chain_v4v6.assert_not_called() - self.assertEqual(set(['fake_log_id']), + self.assertEqual({'fake_log_id'}, f_accept_prefix.log_object_refs) # Test with prefixes_tables does not include the prefix @@ -260,7 +260,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase): assert_called_once_with(ipt_mgr, 'accepted', v4_rules, v6_rules, wrap=True, top=True, tag=prefix.id) - self.assertEqual(set(['fake_log_id']), + self.assertEqual({'fake_log_id'}, prefix.log_object_refs) def test_add_nflog_rules_dropped(self): @@ -283,7 +283,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase): # Test with prefix already added into prefixes_table self.log_driver._add_log_rules_dropped(ipt_mgr, f_port_log) self.log_driver._add_rules_to_chain_v4v6.assert_not_called() - self.assertEqual(set(['fake_log_id']), + self.assertEqual({'fake_log_id'}, f_drop_prefix.log_object_refs) # Test with prefixes_tables does not include the prefix @@ -302,7 +302,7 @@ class BaseIptablesLogTestCase(base.BaseTestCase): ] self.log_driver._add_rules_to_chain_v4v6.\ assert_has_calls(calls) - self.assertEqual(set(['fake_log_id']), + self.assertEqual({'fake_log_id'}, prefix.log_object_refs) def _fake_port_log(self, log_id, event, port_id): @@ -318,8 +318,8 @@ class BaseIptablesLogTestCase(base.BaseTestCase): '-j NFLOG --nflog-prefix %s' % (device, FAKE_RATE, FAKE_BURST, tag)] v4_nflog_rule += ['-o %s -m limit --limit %s/sec --limit-burst %s ' - '-j NFLOG --nflog-prefix %s' - % (device, FAKE_RATE, FAKE_BURST, tag)] + '-j NFLOG --nflog-prefix %s' + % (device, FAKE_RATE, FAKE_BURST, tag)] v6_nflog_rule = ['-i %s -m limit --limit %s/sec --limit-burst %s ' '-j NFLOG --nflog-prefix %s' % (device, FAKE_RATE, FAKE_BURST, tag)] diff --git a/neutron_fwaas/tests/unit/services/logapi/agents/l3/test_fwg_log.py b/neutron_fwaas/tests/unit/services/logapi/agents/l3/test_fwg_log.py index 88400c988..2d6c45763 100644 --- a/neutron_fwaas/tests/unit/services/logapi/agents/l3/test_fwg_log.py +++ b/neutron_fwaas/tests/unit/services/logapi/agents/l3/test_fwg_log.py @@ -27,7 +27,7 @@ from neutron_fwaas.services.logapi.agents.l3 import fwg_log class FWaaSL3LoggingExtensionInitializeTestCase(base.L3LoggingExtBaseTestCase): def setUp(self): - super(FWaaSL3LoggingExtensionInitializeTestCase, self).setUp() + super().setUp() self.fw_l3_log_ext = fwg_log.FWaaSL3LoggingExtension() self.fw_l3_log_ext.consume_api(self.agent_api) diff --git a/neutron_fwaas/tests/unit/services/logapi/base.py b/neutron_fwaas/tests/unit/services/logapi/base.py index 78f610e28..97eb72dec 100644 --- a/neutron_fwaas/tests/unit/services/logapi/base.py +++ b/neutron_fwaas/tests/unit/services/logapi/base.py @@ -20,7 +20,7 @@ from neutron.tests.unit import testlib_api class BaseLogTestCase(testlib_api.SqlTestCase): def setUp(self): - super(BaseLogTestCase, self).setUp() + super().setUp() with mock.patch.object( resource_manager.ResourceCallbacksManager, '_singleton', diff --git a/neutron_fwaas/tests/unit/services/logapi/common/test_fwg_callback.py b/neutron_fwaas/tests/unit/services/logapi/common/test_fwg_callback.py index df984cdd8..18ecf1846 100644 --- a/neutron_fwaas/tests/unit/services/logapi/common/test_fwg_callback.py +++ b/neutron_fwaas/tests/unit/services/logapi/common/test_fwg_callback.py @@ -54,7 +54,7 @@ def fake_register(): class TestFirewallGroupRuleCallback(base.BaseTestCase): def setUp(self): - super(TestFirewallGroupRuleCallback, self).setUp() + super().setUp() self.driver_manager = driver_mgr.LoggingServiceDriverManager() self.fwg_callback = fwg_callback.FirewallGroupCallBack(mock.Mock(), mock.Mock()) diff --git a/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py b/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py index c35d49e77..94498e040 100644 --- a/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py +++ b/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py @@ -70,7 +70,7 @@ def _fake_port_object(port_id, device_owner, status, class LoggingRpcCallbackTestCase(base.BaseTestCase): def setUp(self): - super(LoggingRpcCallbackTestCase, self).setUp() + super().setUp() self.context = mock.Mock() self.rpc_callback = server_rpc.LoggingApiSkeleton() @@ -282,7 +282,7 @@ class LoggingRpcCallbackTestCase(base.BaseTestCase): unbound_resource = uuidutils.generate_uuid() resource_log = _create_log_object(tenant_id, unbound_resource) target_log = _create_log_object(tenant_id, unbound_resource, - target_id) + target_id) log_objs = [log, target_log, resource_log] with mock.patch.object(log_object.Log, 'get_objects', diff --git a/neutron_fwaas/tests/unit/services/logapi/common/test_port_callback.py b/neutron_fwaas/tests/unit/services/logapi/common/test_port_callback.py index 0271a5239..025e5f58e 100644 --- a/neutron_fwaas/tests/unit/services/logapi/common/test_port_callback.py +++ b/neutron_fwaas/tests/unit/services/logapi/common/test_port_callback.py @@ -54,7 +54,7 @@ def fake_register(): class TestFirewallGroupRuleCallback(base.BaseTestCase): def setUp(self): - super(TestFirewallGroupRuleCallback, self).setUp() + super().setUp() self.driver_manager = driver_mgr.LoggingServiceDriverManager() self.port_callback = port_callback.NeutronPortCallBack(mock.Mock(), mock.Mock()) diff --git a/neutron_fwaas/tests/unit/services/logapi/test_fwg_validate.py b/neutron_fwaas/tests/unit/services/logapi/test_fwg_validate.py index 6d4e2470d..c0ca9c296 100644 --- a/neutron_fwaas/tests/unit/services/logapi/test_fwg_validate.py +++ b/neutron_fwaas/tests/unit/services/logapi/test_fwg_validate.py @@ -30,7 +30,7 @@ class TestFWGLogRequestValidations(base.BaseTestCase): """Test validator for a log creation request""" def setUp(self): - super(TestFWGLogRequestValidations, self).setUp() + super().setUp() fwg_validate.fwg_plugin = mock.Mock() fwg_validate.fwg_plugin.driver = mock.Mock() fwg_validate.fwg_plugin.driver.firewall_db = mock.Mock() diff --git a/releasenotes/source/conf.py b/releasenotes/source/conf.py index e9192733c..25247e49e 100644 --- a/releasenotes/source/conf.py +++ b/releasenotes/source/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at