Merge "pyupgrade changes for Python3.9+"

This commit is contained in:
Zuul 2024-11-21 18:55:14 +00:00 committed by Gerrit Code Review
commit 2c644d3a5a
45 changed files with 242 additions and 260 deletions

View File

@ -37,7 +37,6 @@ disable=
expression-not-assigned,
fixme,
global-statement,
non-parent-init-called,
not-callable,
protected-access,
redefined-builtin,
@ -89,8 +88,6 @@ disable=
too-many-return-statements,
too-many-statements,
consider-using-set-comprehension,
useless-object-inheritance,
super-with-arguments,
use-dict-literal
[BASIC]

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2010 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -63,7 +63,7 @@ class VPNAgentOvnSbIdl(ovsdb_monitor.OvnIdl):
return impl_idl_ovn.OvsdbSbOvnIdl(conn)
class VPNAgentOvsIdl(object):
class VPNAgentOvsIdl:
def start(self):
connection_string = config.cfg.CONF.ovs.ovsdb_connection

View File

@ -23,7 +23,7 @@ from neutron_vpnaas.services.vpn.common import topics
AGENT_NOTIFY_MAX_ATTEMPTS = 2
class VPNAgentNotifyAPI(object):
class VPNAgentNotifyAPI:
"""API for plugin to notify VPN agent."""
def __init__(self, topic=topics.IPSEC_AGENT_TOPIC):

View File

@ -228,9 +228,9 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
if "peer_cidrs" in ipsec_sitecon:
changed_peer_cidrs = True
old_peer_cidr_list = ipsec_site_conn_db['peer_cidrs']
old_peer_cidr_dict = dict(
(peer_cidr['cidr'], peer_cidr)
for peer_cidr in old_peer_cidr_list)
old_peer_cidr_dict = {
peer_cidr['cidr']: peer_cidr
for peer_cidr in old_peer_cidr_list}
new_peer_cidr_set = set(ipsec_sitecon["peer_cidrs"])
old_peer_cidr_set = set(old_peer_cidr_dict)
@ -714,7 +714,7 @@ class VPNPluginDb(vpnaas.VPNPluginBase,
return cidrs
class VPNPluginRpcDbMixin(object):
class VPNPluginRpcDbMixin:
def _build_local_subnet_cidr_map(self, context):
"""Build a dict of all local endpoint subnets, with list of CIDRs."""
query = context.session.query(models_v2.Subnet.id,

View File

@ -80,7 +80,7 @@ class VPNExtGW(model_base.BASEV2, model_base.HasId, model_base.HasProject):
@registry.has_registry_receivers
class VPNExtGWPlugin_db(object):
class VPNExtGWPlugin_db:
"""DB class to support vpn external ports configuration."""
@property

View File

@ -27,7 +27,7 @@ from neutron_vpnaas._i18n import _
from neutron_vpnaas.services.vpn.common import constants
class VpnReferenceValidator(object):
class VpnReferenceValidator:
"""
Baseline validation routines for VPN resources.
@ -118,7 +118,7 @@ class VpnReferenceValidator(object):
"""
if len(local_subnets) == 1:
return local_subnets[0]['ip_version']
ip_versions = set([subnet['ip_version'] for subnet in local_subnets])
ip_versions = {subnet['ip_version'] for subnet in local_subnets}
if len(ip_versions) > 1:
raise vpn_exception.MixedIPVersionsForIPSecEndpoints(
group=group_id)
@ -131,7 +131,7 @@ class VpnReferenceValidator(object):
"""
if len(peer_cidrs) == 1:
return netaddr.IPNetwork(peer_cidrs[0]).version
ip_versions = set([netaddr.IPNetwork(pc).version for pc in peer_cidrs])
ip_versions = {netaddr.IPNetwork(pc).version for pc in peer_cidrs}
if len(ip_versions) > 1:
raise vpn_exception.MixedIPVersionsForIPSecEndpoints(
group=group_id)
@ -149,7 +149,7 @@ class VpnReferenceValidator(object):
"""Ensure all CIDRs have the same IP version."""
if len(peer_cidrs) == 1:
return netaddr.IPNetwork(peer_cidrs[0]).version
ip_versions = set([netaddr.IPNetwork(pc).version for pc in peer_cidrs])
ip_versions = {netaddr.IPNetwork(pc).version for pc in peer_cidrs}
if len(ip_versions) > 1:
raise vpn_exception.MixedIPVersionsForPeerCidrs()
return ip_versions.pop()

View File

@ -161,7 +161,7 @@ class RouterReschedulingFailed(exceptions.Conflict):
"No eligible VPN agent found.")
class VPNAgentSchedulerPluginBase(object, metaclass=abc.ABCMeta):
class VPNAgentSchedulerPluginBase(metaclass=abc.ABCMeta):
"""REST API to operate the VPN agent scheduler.
All methods must be in an admin context.

View File

@ -36,7 +36,7 @@ class Vpn_endpoint_groups(extensions.APIExtensionDescriptor):
translate_name=True)
class VPNEndpointGroupsPluginBase(object, metaclass=abc.ABCMeta):
class VPNEndpointGroupsPluginBase(metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_endpoint_group(self, context, endpoint_group):

View File

@ -26,7 +26,7 @@ from neutron_vpnaas.extensions import vpn_agentschedulers
LOG = logging.getLogger(__name__)
class VPNScheduler(object, metaclass=abc.ABCMeta):
class VPNScheduler(metaclass=abc.ABCMeta):
@property
def l3_plugin(self):
return directory.get_plugin(plugin_constants.L3)

View File

@ -114,5 +114,4 @@ class L3WithVPNaaS(VPNAgent):
self.conf = conf
else:
self.conf = cfg.CONF
super(L3WithVPNaaS, self).__init__(
host=self.conf.host, conf=self.conf)
super().__init__(host=self.conf.host, conf=self.conf)

View File

@ -15,7 +15,7 @@
import abc
class DeviceDriver(object, metaclass=abc.ABCMeta):
class DeviceDriver(metaclass=abc.ABCMeta):
def __init__(self, agent, host):
pass

View File

@ -136,7 +136,7 @@ def _get_template(template_file):
return JINJA_ENV.get_template(template_file)
class BaseSwanProcess(object, metaclass=abc.ABCMeta):
class BaseSwanProcess(metaclass=abc.ABCMeta):
"""Swan Family Process Manager
This class manages start/restart/stop ipsec process.
@ -452,8 +452,7 @@ class OpenSwanProcess(BaseSwanProcess):
IPSEC_CONF_NAT_TRAVERSAL = "yes"
def __init__(self, conf, process_id, vpnservice, namespace):
super(OpenSwanProcess, self).__init__(conf, process_id,
vpnservice, namespace)
super().__init__(conf, process_id, vpnservice, namespace)
self.secrets_file = os.path.join(
self.etc_dir, 'ipsec.secrets')
self.config_file = os.path.join(
@ -508,7 +507,7 @@ class OpenSwanProcess(BaseSwanProcess):
# on throwing to tell us something. If the pid file exists,
# delve into the process information and check if it matches
# our expected command line.
with open(self.pid_file, 'r') as f:
with open(self.pid_file) as f:
pid = f.readline().strip()
with open('/proc/%s/cmdline' % pid) as cmd_line_file:
cmd_line = cmd_line_file.readline()
@ -525,7 +524,7 @@ class OpenSwanProcess(BaseSwanProcess):
{'pid': pid, 'cmd_line': cmd_line})
return True
except IOError as e:
except OSError as e:
# This is logged as "info" instead of error because it simply
# means that we couldn't find the files to check on them.
LOG.info('Unable to find control files on startup for '
@ -638,7 +637,7 @@ class OpenSwanProcess(BaseSwanProcess):
nets += ipsec_site_conn['peer_cidrs']
for net in nets:
version = netaddr.IPNetwork(net).version
virtual_privates.append('%%v%s:%s' % (version, net))
virtual_privates.append('%v{}:{}'.format(version, net))
virtual_privates.sort()
return ','.join(virtual_privates)
@ -788,7 +787,7 @@ class OpenSwanProcess(BaseSwanProcess):
self.connection_status = {}
class IPsecVpnDriverApi(object):
class IPsecVpnDriverApi:
"""IPSecVpnDriver RPC api."""
@log_helpers.log_method_call
@ -838,7 +837,7 @@ class IPsecDriver(device_drivers.DeviceDriver, metaclass=abc.ABCMeta):
self.conn = n_rpc.Connection()
self.context = context.get_admin_context_without_session()
self.topic = topics.IPSEC_AGENT_TOPIC
node_topic = '%s.%s' % (self.topic, self.host)
node_topic = '{}.{}'.format(self.topic, self.host)
self.processes = {}
self.routers: ty.Dict[str, ty.Any] = {}

View File

@ -28,8 +28,7 @@ class LibreSwanProcess(ipsec.OpenSwanProcess):
# pylint: disable=useless-super-delegation
def __init__(self, conf, process_id, vpnservice, namespace):
self._rootwrap_cfg = self._get_rootwrap_config()
super(LibreSwanProcess, self).__init__(conf, process_id,
vpnservice, namespace)
super().__init__(conf, process_id, vpnservice, namespace)
def _ipsec_execute(self, cmd, check_exit_code=True, extra_ok_codes=None):
"""Execute ipsec command on namespace.
@ -41,7 +40,7 @@ class LibreSwanProcess(ipsec.OpenSwanProcess):
mount_paths = {'/etc': '%s/etc' % self.config_dir,
'/run': '%s/var/run' % self.config_dir}
mount_paths_str = ','.join(
"%s:%s" % (source, target)
"{}:{}".format(source, target)
for source, target in mount_paths.items())
ns_wrapper = self.get_ns_wrapper()
return ip_wrapper.netns.execute(
@ -49,7 +48,7 @@ class LibreSwanProcess(ipsec.OpenSwanProcess):
'--mount_paths=%s' % mount_paths_str,
('--rootwrap_config=%s' % self._rootwrap_cfg
if self._rootwrap_cfg else ''),
'--cmd=%s,%s' % (self.binary, ','.join(cmd))],
'--cmd={},{}'.format(self.binary, ','.join(cmd))],
check_exit_code=check_exit_code,
extra_ok_codes=extra_ok_codes)
@ -75,7 +74,7 @@ class LibreSwanProcess(ipsec.OpenSwanProcess):
if os.path.exists(secrets_file):
self._execute(['rm', '-f', secrets_file])
super(LibreSwanProcess, self).ensure_configs()
super().ensure_configs()
# LibreSwan uses the capabilities library to restrict access to
# ipsec.secrets to users that have explicit access. Since pluto is

View File

@ -38,7 +38,7 @@ PORT_PREFIXES = {
LOG = logging.getLogger(__name__)
class DeviceManager(object):
class DeviceManager:
"""Device Manager for ports in qvpn-xx namespace.
It is a veth pair, one side in qvpn and the other
side is attached to ovs.
@ -143,7 +143,7 @@ class DeviceManager(object):
subnet_id = fixed_ip['subnet_id']
subnet = self.plugin.get_subnet_info(subnet_id)
net = netaddr.IPNetwork(subnet['cidr'])
ip_cidr = '%s/%s' % (fixed_ip['ip_address'], net.prefixlen)
ip_cidr = '{}/{}'.format(fixed_ip['ip_address'], net.prefixlen)
ip_cidrs.append(ip_cidr)
subnets.append(subnet)
self.driver.init_l3(interface_name, ip_cidrs,
@ -172,14 +172,14 @@ class DeviceManager(object):
ip_cidrs = []
for fixed_ip in vpn_port['fixed_ips']:
ip_cidr = '%s/%s' % (fixed_ip['ip_address'], 28)
ip_cidr = '{}/{}'.format(fixed_ip['ip_address'], 28)
ip_cidrs.append(ip_cidr)
self.driver.init_l3(interface_name, ip_cidrs,
namespace=ns_name)
return interface_name
class NamespaceManager(object):
class NamespaceManager:
def __init__(self, use_ipv6=False):
self.ip_wrapper_root = ip_lib.IPWrapper()
self.use_ipv6 = use_ipv6

View File

@ -82,8 +82,7 @@ class StrongSwanProcess(ipsec.BaseSwanProcess):
self._strongswan_piddir = self._get_strongswan_piddir()
self._rootwrap_cfg = self._get_rootwrap_config()
LOG.debug("strongswan piddir is '%s'", (self._strongswan_piddir))
super(StrongSwanProcess, self).__init__(conf, process_id,
vpnservice, namespace)
super().__init__(conf, process_id, vpnservice, namespace)
def _get_strongswan_piddir(self):
return utils.execute(
@ -113,7 +112,7 @@ class StrongSwanProcess(ipsec.BaseSwanProcess):
ns_wrapper = self.get_ns_wrapper()
return ip_wrapper.netns.execute(
[ns_wrapper,
'--mount_paths=/etc:%s/etc,%s:%s/var/run' % (
'--mount_paths=/etc:{}/etc,{}:{}/var/run'.format(
self.config_dir, self._strongswan_piddir, self.config_dir),
('--rootwrap_config=%s' % self._rootwrap_cfg
if self._rootwrap_cfg else ''),

View File

@ -73,7 +73,7 @@ class ChassisVPNAgentWriteEvent(ovsdb_monitor.ChassisAgentEvent):
clear_down=True)
class OVNVPNAgentMonitor(object):
class OVNVPNAgentMonitor:
def watch_agent_events(self):
l3_plugin = directory.get_plugin(plugin_constants.L3)
sb_ovn = l3_plugin._sb_ovn

View File

@ -1,4 +1,3 @@
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
@ -55,7 +54,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
"""VpnPlugin which supports VPN Service Drivers."""
#TODO(nati) handle ikepolicy and ipsecpolicy update usecase
def __init__(self):
super(VPNDriverPlugin, self).__init__()
super().__init__()
self.service_type_manager = st_db.ServiceTypeManager.get_instance()
add_provider_configuration(self.service_type_manager, constants.VPN)
# Load the service driver from neutron.conf.
@ -167,8 +166,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
driver.validator.validate_ipsec_site_connection(
context,
ipsec_site_connection['ipsec_site_connection'])
ipsec_site_connection = super(
VPNDriverPlugin, self).create_ipsec_site_connection(
ipsec_site_connection = super().create_ipsec_site_connection(
context, ipsec_site_connection)
driver.create_ipsec_site_connection(context, ipsec_site_connection)
return ipsec_site_connection
@ -176,7 +174,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
def delete_ipsec_site_connection(self, context, ipsec_conn_id):
ipsec_site_connection = self.get_ipsec_site_connection(
context, ipsec_conn_id)
super(VPNDriverPlugin, self).delete_ipsec_site_connection(
super().delete_ipsec_site_connection(
context, ipsec_conn_id)
driver = self._get_driver_for_ipsec_site_connection(
context, ipsec_site_connection)
@ -192,8 +190,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
driver.validator.validate_ipsec_site_connection(
context,
ipsec_site_connection['ipsec_site_connection'])
ipsec_site_connection = super(
VPNDriverPlugin, self).update_ipsec_site_connection(
ipsec_site_connection = super().update_ipsec_site_connection(
context,
ipsec_conn_id,
ipsec_site_connection)
@ -204,8 +201,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
def create_vpnservice(self, context, vpnservice):
provider = self._get_provider_for_flavor(
context, vpnservice['vpnservice'].get('flavor_id'))
vpnservice = super(
VPNDriverPlugin, self).create_vpnservice(context, vpnservice)
vpnservice = super().create_vpnservice(context, vpnservice)
self.service_type_manager.add_resource_association(
context, constants.VPN, provider, vpnservice['id'])
driver = self.drivers[provider]
@ -214,8 +210,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
def update_vpnservice(self, context, vpnservice_id, vpnservice):
old_vpn_service = self.get_vpnservice(context, vpnservice_id)
new_vpn_service = super(
VPNDriverPlugin, self).update_vpnservice(context, vpnservice_id,
new_vpn_service = super().update_vpnservice(context, vpnservice_id,
vpnservice)
driver = self._get_driver_for_vpnservice(context, old_vpn_service)
driver.update_vpnservice(context, old_vpn_service, new_vpn_service)
@ -223,7 +218,7 @@ class VPNDriverPlugin(VPNPlugin, vpn_db.VPNPluginRpcDbMixin):
def delete_vpnservice(self, context, vpnservice_id):
vpnservice = self._get_vpnservice(context, vpnservice_id)
super(VPNDriverPlugin, self).delete_vpnservice(context, vpnservice_id)
super().delete_vpnservice(context, vpnservice_id)
driver = self._get_driver_for_vpnservice(context, vpnservice)
self.service_type_manager.del_resource_associations(
context, [vpnservice_id])

View File

@ -26,7 +26,7 @@ from neutron_vpnaas.services.vpn.service_drivers import driver_validator
LOG = logging.getLogger(__name__)
class VpnDriver(object, metaclass=abc.ABCMeta):
class VpnDriver(metaclass=abc.ABCMeta):
def __init__(self, service_plugin, validator=None):
self.service_plugin = service_plugin
@ -70,7 +70,7 @@ class VpnDriver(object, metaclass=abc.ABCMeta):
pass
class BaseIPsecVpnAgentApi(object):
class BaseIPsecVpnAgentApi:
"""Base class for IPSec API to agent."""
def __init__(self, topic, default_version, driver):

View File

@ -31,7 +31,7 @@ IPSEC = 'ipsec'
BASE_IPSEC_VERSION = '1.0'
class IPsecVpnDriverCallBack(object):
class IPsecVpnDriverCallBack:
"""Callback for IPSecVpnDriver rpc."""
# history
@ -40,7 +40,7 @@ class IPsecVpnDriverCallBack(object):
target = oslo_messaging.Target(version=BASE_IPSEC_VERSION)
def __init__(self, driver):
super(IPsecVpnDriverCallBack, self).__init__()
super().__init__()
self.driver = driver
def _get_agent_hosting_vpn_services(self, context, host):
@ -94,15 +94,14 @@ class IPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi):
# pylint: disable=useless-super-delegation
def __init__(self, topic, default_version, driver):
super(IPsecVpnAgentApi, self).__init__(
topic, default_version, driver)
super().__init__(topic, default_version, driver)
class BaseIPsecVPNDriver(service_drivers.VpnDriver, metaclass=abc.ABCMeta):
"""Base VPN Service Driver class."""
def __init__(self, service_plugin, validator=None):
super(BaseIPsecVPNDriver, self).__init__(service_plugin, validator)
super().__init__(service_plugin, validator)
self.create_rpc_conn()
@property
@ -246,7 +245,8 @@ class BaseIPsecVPNDriver(service_drivers.VpnDriver, metaclass=abc.ABCMeta):
peer_cidr.cidr
for peer_cidr in ipsec_site_connection.peer_cidrs]
else:
local_cidrs = [local_cidr_map[ep.endpoint]
local_cidrs = [
local_cidr_map[ep.endpoint]
for ep in ipsec_site_connection.local_ep_group.endpoints]
peer_cidrs = [
ep.endpoint

View File

@ -14,7 +14,7 @@
#
class VpnDriverValidator(object):
class VpnDriverValidator:
"""Driver-specific validation routines for VPN resources."""
def __init__(self, driver):

View File

@ -28,8 +28,7 @@ class IPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
"""VPN Service Driver class for IPsec."""
def __init__(self, service_plugin):
super(IPsecVPNDriver, self).__init__(
service_plugin,
super().__init__(service_plugin,
ipsec_validator.IpsecVpnValidator(self))
def create_rpc_conn(self):

View File

@ -306,20 +306,20 @@ class BaseOvnIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
router = self.l3_plugin.get_router(context, router_id)
old_routes = router.get('routes', [])
old_cidrs = set([r['destination'] for r in old_routes
if r['nexthop'] == nexthop])
old_cidrs = {r['destination'] for r in old_routes
if r['nexthop'] == nexthop}
new_cidrs = set(
self.service_plugin.get_peer_cidrs_for_router(context, router_id))
to_remove = old_cidrs - new_cidrs
if to_remove:
self.l3_plugin.remove_extraroutes(context, router_id,
self._routes_update(to_remove, nexthop))
self.l3_plugin.remove_extraroutes(
context, router_id, self._routes_update(to_remove, nexthop))
to_add = new_cidrs - old_cidrs
if to_add:
self.l3_plugin.add_extraroutes(context, router_id,
self._routes_update(to_add, nexthop))
self.l3_plugin.add_extraroutes(
context, router_id, self._routes_update(to_add, nexthop))
def _get_gateway_ips(self, router):
"""Obtain the IPv4 and/or IPv6 GW IP for the router.

View File

@ -23,7 +23,7 @@ LOG = logging.getLogger(__name__)
DEVICE_DRIVERS = 'device_drivers'
class VPNService(object):
class VPNService:
"""VPN Service observer."""
def __init__(self, l3_agent):

View File

@ -166,9 +166,9 @@ FAKE_ROUTER = {
}
# It's a long name.
NON_ASCII_VPNSERVICE_NAME = u'\u9577\u3044\u540d\u524d\u3067\u3059'
NON_ASCII_VPNSERVICE_NAME = '\u9577\u3044\u540d\u524d\u3067\u3059'
# I'm doing very well.
NON_ASCII_PSK = u'\u00e7a va tr\u00e9s bien'
NON_ASCII_PSK = '\u00e7a va tr\u00e9s bien'
def get_ovs_bridge(br_name):
@ -178,7 +178,7 @@ def get_ovs_bridge(br_name):
Vm = collections.namedtuple('Vm', ['namespace', 'port_ip'])
class SiteInfo(object):
class SiteInfo:
"""Holds info on the router, ports, service, and connection."""
@ -266,16 +266,16 @@ class SiteInfoWithHaRouter(SiteInfo):
self.failover_host = failover_host
self.get_ns_name = mock.patch.object(n_namespaces.RouterNamespace,
'_get_ns_name').start()
super(SiteInfoWithHaRouter, self).__init__(public_net, private_nets)
super().__init__(public_net, private_nets)
def generate_router_info(self):
super(SiteInfoWithHaRouter, self).generate_router_info()
super().generate_router_info()
self.info['ha'] = True
self.info['ha_vr_id'] = 1
self.info[constants.HA_INTERFACE_KEY] = (
l3_test_common.get_ha_interface())
# Mock router namespace name, for when router is created
self.get_ns_name.return_value = "qrouter-{0}-{1}".format(
self.get_ns_name.return_value = "qrouter-{}-{}".format(
self.info['id'], self.host)
def generate_backup_router_info(self):
@ -285,7 +285,7 @@ class SiteInfoWithHaRouter(SiteInfo):
l3_test_common.get_ha_interface(ip='169.254.192.2',
mac='22:22:22:22:22:22'))
# Mock router namespace name, for when router is created
self.get_ns_name.return_value = "qrouter-{0}-{1}".format(
self.get_ns_name.return_value = "qrouter-{}-{}".format(
info['id'], self.failover_host)
return info
@ -294,7 +294,7 @@ class TestIPSecBase(framework.L3AgentTestFramework):
NESTED_NAMESPACE_SEPARATOR = '@'
def setUp(self):
super(TestIPSecBase, self).setUp()
super().setUp()
common_config.register_common_config_options()
mock.patch('neutron_vpnaas.services.vpn.device_drivers.ipsec.'
'IPsecVpnDriverApi').start()
@ -374,7 +374,7 @@ class TestIPSecBase(framework.L3AgentTestFramework):
config.set_override('external_pids',
get_temp_file_path('external/pids'))
config.set_override('host', host)
ipsec_config_base_dir = '%s/%s' % (temp_dir.path, 'ipsec')
ipsec_config_base_dir = '{}/{}'.format(temp_dir.path, 'ipsec')
config.set_override('config_base_dir',
ipsec_config_base_dir, group='ipsec')
@ -404,7 +404,7 @@ class TestIPSecBase(framework.L3AgentTestFramework):
def _append_suffix(dev_name):
# If dev_name = 'xyz123' and the suffix is 'agent2' then the result
# will be 'xy-nt2'
return "{0}-{1}".format(dev_name[:-4], agent.host[-3:])
return f"{dev_name[:-4]}-{agent.host[-3:]}"
def get_internal_device_name(port_id):
return _append_suffix(

View File

@ -27,7 +27,7 @@ STATUS_PATTERN = re.compile('Command:.*ip.*addr.*show.*Exit code: 0')
class TestNetnsWrapper(base.BaseSudoTestCase):
def setUp(self):
super(TestNetnsWrapper, self).setUp()
super().setUp()
config.setup_logging()
self.fake_ns = 'func-8f1b728c-6eca-4042-9b6b-6ef66ab9352a'
self.mount_paths = ('--mount_paths=/etc:/var/lib/neutron'

View File

@ -109,7 +109,7 @@ class TestStrongSwanDeviceDriver(base.BaseSudoTestCase):
"""Test the StrongSwan reference implementation of the device driver."""
def setUp(self):
super(TestStrongSwanDeviceDriver, self).setUp()
super().setUp()
self.conf = cfg.CONF
self.conf.register_opts(l3_config.OPTS)
self.conf.register_opts(ipsec.ipsec_opts, 'ipsec')
@ -170,7 +170,7 @@ class TestStrongSwanDeviceDriver(base.BaseSudoTestCase):
class TestStrongSwanScenario(test_scenario.TestIPSecBase):
def setUp(self):
super(TestStrongSwanScenario, self).setUp()
super().setUp()
self.conf.register_opts(strongswan_ipsec.strongswan_opts,
'strongswan')
VPNAAS_STRONGSWAN_DEVICE = ('neutron_vpnaas.services.vpn.'

View File

@ -44,7 +44,7 @@ VPN_HOSTA = "host-1"
VPN_HOSTB = "host-2"
class VPNAgentSchedulerTestMixIn(object):
class VPNAgentSchedulerTestMixIn:
def _request_list(self, path, admin_context=True,
expected_code=exc.HTTPOk.code):
req = self._path_req(path, admin_context=admin_context)
@ -85,7 +85,7 @@ class VPNAgentSchedulerTestMixIn(object):
def _list_routers_hosted_by_vpn_agent(self, agent_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (agent_id,
path = "/agents/{}/{}.{}".format(agent_id,
vpn_agentschedulers.VPN_ROUTERS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
@ -94,7 +94,7 @@ class VPNAgentSchedulerTestMixIn(object):
def _add_router_to_vpn_agent(self, id, router_id,
expected_code=exc.HTTPCreated.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (id,
path = "/agents/{}/{}.{}".format(id,
vpn_agentschedulers.VPN_ROUTERS,
self.fmt)
req = self._path_create_request(path,
@ -106,7 +106,7 @@ class VPNAgentSchedulerTestMixIn(object):
def _list_vpn_agents_hosting_router(self, router_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/routers/%s/%s.%s" % (router_id,
path = "/routers/{}/{}.{}".format(router_id,
vpn_agentschedulers.VPN_AGENTS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
@ -115,7 +115,7 @@ class VPNAgentSchedulerTestMixIn(object):
def _remove_router_from_vpn_agent(self, id, router_id,
expected_code=exc.HTTPNoContent.code,
admin_context=True):
path = "/agents/%s/%s/%s.%s" % (id,
path = "/agents/{}/{}/{}.{}".format(id,
vpn_agentschedulers.VPN_ROUTERS,
router_id,
self.fmt)

View File

@ -65,16 +65,16 @@ class TestVpnCorePlugin(test_l3_plugin.TestL3NatIntPlugin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin):
def __init__(self, configfile=None):
super(TestVpnCorePlugin, self).__init__()
super().__init__()
self.router_scheduler = l3_agent_scheduler.ChanceScheduler()
class VPNTestMixin(object):
resource_prefix_map = dict(
(k.replace('_', '-'),
"/vpn")
class VPNTestMixin:
resource_prefix_map = {
k.replace('_', '-'):
"/vpn"
for k in vpn.RESOURCE_ATTRIBUTE_MAP
)
}
def _create_ikepolicy(self, fmt,
name='ikepolicy1',
@ -412,14 +412,14 @@ class VPNTestMixin(object):
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(
keys,
dict((k, v) for k, v
{k: v for k, v
in ipsec_site_connection.items()
if k in keys))
if k in keys})
self.assertEqual(
dpd,
dict((k, v) for k, v
{k: v for k, v
in ipsec_site_connection['dpd'].items()
if k in dpd))
if k in dpd})
def _set_active(self, model, resource_id):
service_plugin = directory.get_plugin(nconstants.VPN)
@ -463,10 +463,7 @@ class VPNPluginDbTestCase(VPNTestMixin,
plugin_str = ('neutron_vpnaas.tests.unit.db.vpn.'
'test_vpn_db.TestVpnCorePlugin')
super(VPNPluginDbTestCase, self).setUp(
plugin_str,
service_plugins=service_plugins
)
super().setUp(plugin_str, service_plugins=service_plugins)
self._subnet_id = _uuid()
self.core_plugin = TestVpnCorePlugin()
self.plugin = vpn_plugin.VPNPlugin()
@ -488,7 +485,7 @@ class TestVpnaas(VPNPluginDbTestCase):
# NOTE(armax): make sure that the callbacks needed by this test are
# registered, as they may get wiped out depending by the order in
# which imports, subscriptions and mocks occur.
super(TestVpnaas, self).setUp(**kwargs)
super().setUp(**kwargs)
vpn_db.subscribe()
def _check_policy(self, policy, keys, lifetime):
@ -944,9 +941,9 @@ class TestVpnaas(VPNPluginDbTestCase):
router=router,
description=description,
**extras) as vpnservice:
self.assertEqual(dict((k, v) for k, v in
self.assertEqual({k: v for k, v in
vpnservice['vpnservice'].items()
if k in expected),
if k in expected},
expected)
def test_delete_router_interface_in_use_by_vpnservice(self):
@ -1718,7 +1715,7 @@ class TestVpnaas(VPNPluginDbTestCase):
# tests.
# TODO(pcm): Put helpers in another module for sharing
class NeutronResourcesMixin(object):
class NeutronResourcesMixin:
def create_network(self, overrides=None):
"""Create database entry for network."""
@ -1815,7 +1812,7 @@ class TestVpnDatabase(base.NeutronDbPluginV2TestCase, NeutronResourcesMixin):
# Setup the core plugin
self.plugin_str = ('neutron_vpnaas.tests.unit.db.vpn.'
'test_vpn_db.TestVpnCorePlugin')
super(TestVpnDatabase, self).setUp(self.plugin_str)
super().setUp(self.plugin_str)
# Get the plugins
self.core_plugin = directory.get_plugin()
self.l3_plugin = directory.get_plugin(nconstants.L3)
@ -2025,7 +2022,8 @@ class TestVpnDatabase(base.NeutronDbPluginV2TestCase, NeutronResourcesMixin):
expected1.update({'id': group_id1})
expected2.update({'id': group_id2})
expected_groups = [expected1, expected2]
actual_groups = self.plugin.get_endpoint_groups(self.context,
actual_groups = self.plugin.get_endpoint_groups(
self.context,
fields=('type', 'tenant_id', 'endpoints',
'name', 'description', 'id'))
for expected_group, actual_group in zip(expected_groups,

View File

@ -41,7 +41,7 @@ IPV6 = 6
class TestVpnValidation(base.BaseTestCase):
def setUp(self):
super(TestVpnValidation, self).setUp()
super().setUp()
self.l3_plugin = mock.Mock()
self.core_plugin = mock.Mock()
directory.add_plugin(nconstants.CORE, self.core_plugin)

View File

@ -27,7 +27,7 @@ class DummyIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
"""Dummy VPN Service Driver class for IPsec."""
def __init__(self, service_plugin):
super(DummyIPsecVPNDriver, self).__init__(
super().__init__(
service_plugin,
ipsec_validator.IpsecVpnValidator(self))

View File

@ -41,7 +41,7 @@ class VpnEndpointGroupsTestCase(base.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(VpnEndpointGroupsTestCase, self).setUp()
super().setUp()
plural_mappings = {'endpoint_group': 'endpoint-groups'}
self.setup_extension(
'neutron_vpnaas.tests.unit.extensions.test_vpn_endpoint_groups.'

View File

@ -32,7 +32,7 @@ class VpnaasExtensionTestCase(base.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(VpnaasExtensionTestCase, self).setUp()
super().setUp()
plural_mappings = {'ipsecpolicy': 'ipsecpolicies',
'ikepolicy': 'ikepolicies',
'ipsec_site_connection': 'ipsec-site-connections',

View File

@ -22,7 +22,7 @@ from neutron_vpnaas.services.vpn.common import netns_wrapper as nswrap
class TestNetnsWrapper(base.BaseTestCase):
def setUp(self):
super(TestNetnsWrapper, self).setUp()
super().setUp()
patch_methods = ['filter_command',
'execute',
'setup_conf']

View File

@ -297,32 +297,33 @@ EXPECTED_IPSEC_STRONGSWAN_SECRET_CONF = '''
60.0.0.4 60.0.0.6 : PSK 0scGFzc3dvcmQ=
''' % FAKE_VPNSERVICE_ID
PLUTO_ACTIVE_STATUS = """000 "%(conn_id)s/0x1": erouted;\n
000 #4: "%(conn_id)s/0x1":500 STATE_QUICK_R2 (IPsec SA established); \
newest IPSEC;""" % {
'conn_id': FAKE_IPSEC_SITE_CONNECTION2_ID}
PLUTO_ACTIVE_STATUS_IKEV2 = """000 "%(conn_id)s/0x1": erouted;\n
000 #4: "%(conn_id)s/0x1":500 STATE_PARENT_R2 (PARENT SA established); \
newest IPSEC;""" % {
'conn_id': FAKE_IPSEC_SITE_CONNECTION2_ID}
PLUTO_MULTIPLE_SUBNETS_ESTABLISHED_STATUS = """000 "%(conn_id1)s/1x1": erouted;\n
000 #4: "%(conn_id1)s/1x1":500 STATE_QUICK_R2 (IPsec SA established); \
PLUTO_ACTIVE_STATUS = """000 "{conn_id}/0x1": erouted;\n
000 #4: "{conn_id}/0x1":500 STATE_QUICK_R2 (IPsec SA established); \
newest IPSEC;""".format(
conn_id=FAKE_IPSEC_SITE_CONNECTION2_ID)
PLUTO_ACTIVE_STATUS_IKEV2 = """000 "{conn_id}/0x1": erouted;\n
000 #4: "{conn_id}/0x1":500 STATE_PARENT_R2 (PARENT SA established); \
newest IPSEC;""".format(
conn_id=FAKE_IPSEC_SITE_CONNECTION2_ID)
PLUTO_MULTIPLE_SUBNETS_ESTABLISHED_STATUS = """000 "{conn_id1}/1x1": erouted;\n
000 #4: "{conn_id1}/1x1":500 STATE_QUICK_R2 (IPsec SA established); \
newest IPSEC;\n
000 "%(conn_id2)s/2x1": erouted;\n
000 #4: "%(conn_id2)s/2x1":500 STATE_QUICK_R2 (IPsec SA established); \
newest IPSEC;\n""" % { # noqa: E501
'conn_id1': FAKE_IPSEC_SITE_CONNECTION1_ID,
'conn_id2': FAKE_IPSEC_SITE_CONNECTION2_ID}
PLUTO_ACTIVE_NO_IPSEC_SA_STATUS = """000 "%(conn_id)s/0x1": erouted;\n
000 #258: "%(conn_id)s/0x1":500 STATE_MAIN_R2 (sent MR2, expecting MI3);""" % {
000 "{conn_id2}/2x1": erouted;\n
000 #4: "{conn_id2}/2x1":500 STATE_QUICK_R2 (IPsec SA established); \
newest IPSEC;\n""".format( # noqa: E501
conn_id1=FAKE_IPSEC_SITE_CONNECTION1_ID,
conn_id2=FAKE_IPSEC_SITE_CONNECTION2_ID)
PLUTO_ACTIVE_NO_IPSEC_SA_STATUS = """000 "{conn_id}/0x1": erouted;\n
000 #258: "{conn_id}/0x1":500 STATE_MAIN_R2
(sent MR2, expecting MI3);""".format(
conn_id=FAKE_IPSEC_SITE_CONNECTION2_ID)
PLUTO_DOWN_STATUS = "000 \"%(conn_id)s/0x1\": unrouted;" % {
'conn_id': FAKE_IPSEC_SITE_CONNECTION2_ID}
PLUTO_DOWN_STATUS = "000 \"%(conn_id)s/0x1\": unrouted;" % {'conn_id':
FAKE_IPSEC_SITE_CONNECTION2_ID}
CHARON_ACTIVE_STATUS = "%(conn_id)s{1}: INSTALLED, TUNNEL" % {'conn_id':
FAKE_IPSEC_SITE_CONNECTION2_ID}
CHARON_DOWN_STATUS = "%(conn_id)s{1}: ROUTED, TUNNEL" % {'conn_id':
FAKE_IPSEC_SITE_CONNECTION2_ID}
CHARON_ACTIVE_STATUS = "%(conn_id)s{1}: INSTALLED, TUNNEL" % {
'conn_id': FAKE_IPSEC_SITE_CONNECTION2_ID}
CHARON_DOWN_STATUS = "%(conn_id)s{1}: ROUTED, TUNNEL" % {
'conn_id': FAKE_IPSEC_SITE_CONNECTION2_ID}
NOT_RUNNING_STATUS = "Command: ['ipsec', 'status'] Exit code: 3 Stdout:"
@ -331,7 +332,7 @@ class BaseIPsecDeviceDriver(base.BaseTestCase):
def setUp(self, driver=openswan_ipsec.OpenSwanDriver,
ipsec_process=openswan_ipsec.OpenSwanProcess,
vpnservice=FAKE_VPN_SERVICE):
super(BaseIPsecDeviceDriver, self).setUp()
super().setUp()
for klass in [
'neutron_lib.rpc.Connection',
'oslo_service.loopingcall.FixedIntervalLoopingCall'
@ -431,7 +432,7 @@ class IPSecDeviceLegacy(BaseIPsecDeviceDriver):
def setUp(self, driver=openswan_ipsec.OpenSwanDriver,
ipsec_process=openswan_ipsec.OpenSwanProcess):
super(IPSecDeviceLegacy, self).setUp(driver, ipsec_process)
super().setUp(driver, ipsec_process)
self._make_router_info_for_test()
def _make_router_info_for_test(self):
@ -903,7 +904,7 @@ class IPSecDeviceDVR(BaseIPsecDeviceDriver):
def setUp(self, driver=openswan_ipsec.OpenSwanDriver,
ipsec_process=openswan_ipsec.OpenSwanProcess):
super(IPSecDeviceDVR, self).setUp(driver, ipsec_process)
super().setUp(driver, ipsec_process)
mock.patch.object(dvr_snat_ns.SnatNamespace, 'create').start()
self._make_dvr_edge_router_info_for_test()
@ -972,8 +973,7 @@ class TestOpenSwanConfigGeneration(BaseIPsecDeviceDriver):
def setUp(self, driver=openswan_ipsec.OpenSwanDriver,
ipsec_process=openswan_ipsec.OpenSwanProcess):
super(TestOpenSwanConfigGeneration, self).setUp(
driver, ipsec_process, vpnservice=FAKE_VPN_SERVICE)
super().setUp(driver, ipsec_process, vpnservice=FAKE_VPN_SERVICE)
self.conf.register_opts(openswan_ipsec.openswan_opts, 'openswan')
self.conf.set_override('state_path', '/tmp')
self.ipsec_template = self.conf.openswan.ipsec_config_template
@ -985,7 +985,8 @@ class TestOpenSwanConfigGeneration(BaseIPsecDeviceDriver):
def build_ipsec_expected_config_for_test(self, info):
"""Modify OpenSwan ipsec expected config files for test variations."""
auth_mode = info.get('ipsec_auth', AUTH_ESP)
conn_details = OPENSWAN_CONNECTION_DETAILS % {'auth_mode': auth_mode,
conn_details = OPENSWAN_CONNECTION_DETAILS % {
'auth_mode': auth_mode,
'dpd_action': 'hold',
'dpd_delay': 30,
'dpd_timeout': 120,
@ -1004,14 +1005,14 @@ class TestOpenSwanConfigGeneration(BaseIPsecDeviceDriver):
local_cidrs.append("=%s" % cidr[0])
for net in cidr:
version = netaddr.IPNetwork(net).version
virtual_privates.append('%%v%s:%s' % (version, net))
virtual_privates.append('%v{}:{}'.format(version, net))
# Convert peer CIDRs into space separated strings
cidrs = info.get('peer_cidrs', [['20.0.0.0/24', '30.0.0.0/24'],
['40.0.0.0/24', '50.0.0.0/24']])
for cidr in cidrs:
for net in cidr:
version = netaddr.IPNetwork(net).version
virtual_privates.append('%%v%s:%s' % (version, net))
virtual_privates.append('%v{}:{}'.format(version, net))
peer_cidrs = [' '.join(cidr) for cidr in cidrs]
local_ip = info.get('local', '60.0.0.4')
local_id = info.get('local_id')
@ -1097,7 +1098,7 @@ class IPsecStrongswanConfigGeneration(BaseIPsecDeviceDriver):
def setUp(self, driver=strongswan_ipsec.StrongSwanDriver,
ipsec_process=strongswan_ipsec.StrongSwanProcess):
super(IPsecStrongswanConfigGeneration, self).setUp(
super().setUp(
driver, ipsec_process, vpnservice=FAKE_VPN_SERVICE)
self.conf.register_opts(strongswan_ipsec.strongswan_opts,
'strongswan')
@ -1203,7 +1204,7 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
def setUp(self, driver=openswan_ipsec.OpenSwanDriver,
ipsec_process=openswan_ipsec.OpenSwanProcess):
super(TestOpenSwanProcess, self).setUp(driver, ipsec_process)
super().setUp(driver, ipsec_process)
self.conf.register_opts(openswan_ipsec.openswan_opts,
'openswan')
self.conf.set_override('state_path', '/tmp')
@ -1414,7 +1415,7 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
@mock.patch('os.path.exists', return_value=True)
@mock.patch('neutron_vpnaas.services.vpn.device_drivers.ipsec.open',
create=True,
side_effect=[io.StringIO(u'invalid'),
side_effect=[io.StringIO('invalid'),
IOError])
def test_process_running_bogus_pid(self, mock_open, mock_exists):
with mock.patch.object(openswan_ipsec.LOG, 'error'):
@ -1425,7 +1426,7 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
@mock.patch('os.path.exists', return_value=True)
@mock.patch('neutron_vpnaas.services.vpn.device_drivers.ipsec.open',
create=True,
side_effect=[io.StringIO(u'134'), io.StringIO(u'')])
side_effect=[io.StringIO('134'), io.StringIO('')])
def test_process_running_no_cmdline(self, mock_open, mock_exists):
with mock.patch.object(openswan_ipsec.LOG, 'error') as log_mock:
self.assertFalse(self.process._process_running())
@ -1435,7 +1436,7 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
@mock.patch('os.path.exists', return_value=True)
@mock.patch('neutron_vpnaas.services.vpn.device_drivers.ipsec.open',
create=True,
side_effect=[io.StringIO(u'134'), io.StringIO(u'ps ax')])
side_effect=[io.StringIO('134'), io.StringIO('ps ax')])
def test_process_running_cmdline_mismatch(self, mock_open, mock_exists):
with mock.patch.object(openswan_ipsec.LOG, 'error') as log_mock:
self.assertFalse(self.process._process_running())
@ -1445,8 +1446,8 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
@mock.patch('os.path.exists', return_value=True)
@mock.patch('neutron_vpnaas.services.vpn.device_drivers.ipsec.open',
create=True,
side_effect=[io.StringIO(u'134'),
io.StringIO(u'/usr/libexec/ipsec/pluto -ctlbase'
side_effect=[io.StringIO('134'),
io.StringIO('/usr/libexec/ipsec/pluto -ctlbase'
'/some/foo/path')])
def test_process_running_cmdline_match(self, mock_open, mock_exists):
self.process.pid_path = '/some/foo/path'
@ -1491,7 +1492,7 @@ class TestOpenSwanProcess(IPSecDeviceLegacy):
class TestLibreSwanProcess(base.BaseTestCase):
def setUp(self):
super(TestLibreSwanProcess, self).setUp()
super().setUp()
self.vpnservice = copy.deepcopy(FAKE_VPN_SERVICE)
self.ipsec_process = libreswan_ipsec.LibreSwanProcess(cfg.CONF,
@ -1595,8 +1596,7 @@ class IPsecStrongswanDeviceDriverLegacy(IPSecDeviceLegacy):
def setUp(self, driver=strongswan_ipsec.StrongSwanDriver,
ipsec_process=strongswan_ipsec.StrongSwanProcess):
super(IPsecStrongswanDeviceDriverLegacy, self).setUp(driver,
ipsec_process)
super().setUp(driver, ipsec_process)
self.conf.register_opts(strongswan_ipsec.strongswan_opts,
'strongswan')
self.conf.set_override('state_path', '/tmp')
@ -1625,5 +1625,4 @@ class IPsecStrongswanDeviceDriverLegacy(IPSecDeviceLegacy):
class IPsecStrongswanDeviceDriverDVR(IPSecDeviceDVR):
def setUp(self, driver=strongswan_ipsec.StrongSwanDriver,
ipsec_process=strongswan_ipsec.StrongSwanProcess):
super(IPsecStrongswanDeviceDriverDVR, self).setUp(driver,
ipsec_process)
super().setUp(driver, ipsec_process)

View File

@ -75,7 +75,8 @@ class TestDeviceManager(base.BaseTestCase):
self.conf = cfg.CONF
self.conf.register_opts(common_config.core_opts)
self.conf.register_opts(agent_config.INTERFACE_DRIVER_OPTS)
self.conf.set_override('interface_driver',
self.conf.set_override(
'interface_driver',
'neutron_vpnaas.tests.unit.services.vpn.device_drivers'
'.test_ovn_ipsec.fake_interface_driver')
self.host = "some-hostname"
@ -181,7 +182,8 @@ class TestOvnStrongSwanDriver(test_ipsec.IPSecDeviceLegacy):
conf = cfg.CONF
conf.register_opts(common_config.core_opts)
conf.register_opts(agent_config.INTERFACE_DRIVER_OPTS)
conf.set_override('interface_driver',
conf.set_override(
'interface_driver',
'neutron_vpnaas.tests.unit.services.vpn.device_drivers'
'.test_ovn_ipsec.fake_interface_driver')

View File

@ -49,13 +49,13 @@ class FakeSqlQueryObject(dict):
def __init__(self, **entries):
self.__dict__.update(entries)
super(FakeSqlQueryObject, self).__init__(**entries)
super().__init__(**entries)
class TestValidatorSelection(base.BaseTestCase):
def setUp(self):
super(TestValidatorSelection, self).setUp()
super().setUp()
vpnaas_provider = [{
'service_type': constants.VPN,
'name': 'vpnaas',
@ -86,7 +86,7 @@ class TestValidatorSelection(base.BaseTestCase):
class TestIPsecDriver(base.BaseTestCase):
def setUp(self):
super(TestIPsecDriver, self).setUp()
super().setUp()
mock.patch('neutron_lib.rpc.Connection').start()
l3_agent = mock.Mock()

View File

@ -54,10 +54,10 @@ class FakeSqlQueryObject(dict):
def __init__(self, **entries):
self.__dict__.update(entries)
super(FakeSqlQueryObject, self).__init__(**entries)
super().__init__(**entries)
class FakeGatewayDB(object):
class FakeGatewayDB:
def __init__(self):
self.gateways_by_router = {}
self.gateways_by_id = {}

View File

@ -60,14 +60,13 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
self.driver.validator = driver_validator.VpnDriverValidator(
self.driver)
driver_cls.return_value = self.driver
super(TestVPNDriverPlugin, self).setUp(
vpnaas_plugin=VPN_DRIVER_CLASS)
super().setUp(vpnaas_plugin=VPN_DRIVER_CLASS)
# Note: Context must be created after BaseTestCase.setUp() so that
# config for policy is set.
self.adminContext = context.get_admin_context()
def test_create_ipsec_site_connection(self, **extras):
super(TestVPNDriverPlugin, self).test_create_ipsec_site_connection()
super().test_create_ipsec_site_connection()
self.driver.create_ipsec_site_connection.assert_called_once_with(
mock.ANY, mock.ANY)
self.driver.delete_ipsec_site_connection.assert_called_once_with(
@ -79,7 +78,7 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
return_value=self.driver).start()
stm = directory.get_plugin(p_constants.VPN).service_type_manager
stm.add_resource_association = mock.Mock()
super(TestVPNDriverPlugin, self).test_create_vpnservice()
super().test_create_vpnservice()
self.driver.create_vpnservice.assert_called_once_with(
mock.ANY, mock.ANY)
stm.add_resource_association.assert_called_once_with(
@ -88,14 +87,14 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
def test_delete_vpnservice(self, **extras):
stm = directory.get_plugin(p_constants.VPN).service_type_manager
stm.del_resource_associations = mock.Mock()
super(TestVPNDriverPlugin, self).test_delete_vpnservice()
super().test_delete_vpnservice()
self.driver.delete_vpnservice.assert_called_once_with(
mock.ANY, mock.ANY)
stm.del_resource_associations.assert_called_once_with(
mock.ANY, [mock.ANY])
def test_update_vpnservice(self, **extras):
super(TestVPNDriverPlugin, self).test_update_vpnservice()
super().test_update_vpnservice()
self.driver.update_vpnservice.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY)
@ -181,7 +180,7 @@ class TestVPNDriverPlugin(test_db_vpnaas.TestVpnaas,
class TestVPNDriverPluginMultipleDrivers(base.BaseTestCase):
def setUp(self):
super(TestVPNDriverPluginMultipleDrivers, self).setUp()
super().setUp()
vpnaas_providers = [
{'service_type': p_constants.VPN,
'name': 'ipsec',

View File

@ -49,7 +49,7 @@ class NoopDeviceDriver(device_drivers.DeviceDriver):
class VPNBaseTestCase(base.BaseTestCase):
def setUp(self):
super(VPNBaseTestCase, self).setUp()
super().setUp()
self.conf = cfg.CONF
self.ri_kwargs = {'router': {'id': FAKE_ROUTER_ID, 'ha': False},
'agent_conf': self.conf,
@ -59,7 +59,7 @@ class VPNBaseTestCase(base.BaseTestCase):
class TestVirtualPrivateNetworkDeviceDriverLoading(VPNBaseTestCase):
def setUp(self):
super(TestVirtualPrivateNetworkDeviceDriverLoading, self).setUp()
super().setUp()
cfg.CONF.register_opts(vpn_agent.vpn_agent_opts, 'vpnagent')
self.agent = mock.Mock()
self.agent.conf = cfg.CONF

View File

@ -13,7 +13,6 @@
# under the License.
import os
import socket
import stat
import time
@ -54,7 +53,7 @@ def execute_cmd_over_ssh(host, cmd, private_key):
"AUTHENTICATION EXCEPTION WHEN CONNECTING TO %s", host["ip"], e)
except paramiko.SSHException as e:
raise Exception("SSH EXCEPTION WHEN CONNECTING TO %s", host["ip"], e)
except socket.error as e:
except OSError as e:
raise Exception("SOCKET ERROR WHEN CONNECTING TO %s", host["ip"], e)
LOG.debug("CONNECTED TO HOST <%s>", host["ip"])
try:
@ -254,7 +253,7 @@ def write_key_to_compute_node(keypair, local_path, remote_path, host,
try:
sftp_client = paramiko.SFTPClient.from_transport(transport)
sftp_client.put(local_path, remote_path)
except IOError as e:
except OSError as e:
raise Exception("FILE PATH DOESN'T EXIST", e)
finally:
transport.close()
@ -429,7 +428,7 @@ def get_interfaces(namespace_controller_tuple, private_key):
namespace, controller = namespace_controller_tuple
LOG.debug("GET THE INTERFACES BY USING 'ip a' FROM THE NAMESPACE %s",
namespace)
cmd = "sudo ip netns exec {} ip a".format(namespace)
cmd = f"sudo ip netns exec {namespace} ip a"
interfaces = execute_cmd_over_ssh(controller, cmd, private_key)
LOG.debug("INTERFACES %s", interfaces)
return interfaces
@ -621,7 +620,7 @@ def delete_keyfiles(local_key_files, remote_key_files=None,
if ns_compute_tuples:
LOG.debug("DELETING RALLY KEY FILES FROM COMPUTE HOSTS")
for key, ns_comp in zip(remote_key_files, ns_compute_tuples):
cmd = "sudo rm -f {}".format(key)
cmd = f"sudo rm -f {key}"
host = ns_comp[1]
execute_cmd_over_ssh(host, cmd, private_key)

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

View File

@ -35,7 +35,7 @@ class ASTWalker(compiler.visitor.ASTVisitor):
compiler.visitor.ASTVisitor.default(self, node, *args)
class Visitor(object):
class Visitor:
def __init__(self, filename, i18n_msg_predicates,
msg_format_checkers, debug):