Fix pylint checks

pylint 2.10.2 introduced new checkers.

Fixed redundant-u-string-prefix, use-dict-literal, use-list-literal, and
unspecified-encoding warnings

Change-Id: Ifdb1acb7e53ca2acfbef8c34dbb360d738117439
This commit is contained in:
Gregory Thiemonge 2021-08-24 16:02:08 +02:00 committed by Michael Johnson
parent 5c8f60614e
commit 20da8e1aa3
14 changed files with 47 additions and 37 deletions

View File

@ -127,8 +127,8 @@ class AmphoraInfo(object):
def _get_meminfo(self):
re_parser = re.compile(r'^(?P<key>\S*):\s*(?P<value>\d*)\s*kB')
result = dict()
with open('/proc/meminfo', 'r') as meminfo:
result = {}
with open('/proc/meminfo', 'r', encoding='utf-8') as meminfo:
for line in meminfo:
match = re_parser.match(line)
if not match:
@ -138,7 +138,7 @@ class AmphoraInfo(object):
return result
def _cpu(self):
with open('/proc/stat') as f:
with open('/proc/stat', encoding='utf-8') as f:
cpu = f.readline()
vals = cpu.split(' ')
return {
@ -153,13 +153,13 @@ class AmphoraInfo(object):
}
def _load(self):
with open('/proc/loadavg') as f:
with open('/proc/loadavg', encoding='utf-8') as f:
load = f.readline()
vals = load.split(' ')
return vals[:3]
def _get_networks(self):
networks = dict()
networks = {}
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
for interface in netns.get_links():
interface_name = None

View File

@ -146,7 +146,8 @@ class Keepalived(object):
keepalived_pid_path = util.keepalived_pid_path()
try:
# Is there a pid file for keepalived?
with open(keepalived_pid_path, 'r') as pid_file:
with open(keepalived_pid_path,
'r', encoding='utf-8') as pid_file:
pid = int(pid_file.readline())
os.kill(pid, 0)

View File

@ -189,7 +189,8 @@ class KeepalivedLvs(lvs_listener_base.LvsListenerApiServerBase):
:param listener_id: the id of the listener
"""
self._check_lvs_listener_exists(listener_id)
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as file:
with open(util.keepalived_lvs_cfg_path(listener_id),
'r', encoding='utf-8') as file:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
return resp
@ -242,7 +243,7 @@ class KeepalivedLvs(lvs_listener_base.LvsListenerApiServerBase):
'/proc', util.get_keepalivedlvs_pid(listener_id))):
# Check if the listener is disabled
with open(util.keepalived_lvs_cfg_path(listener_id),
'r') as file:
'r', encoding='utf-8') as file:
cfg = file.read()
m = re.search('virtual_server', cfg)
if m:
@ -256,7 +257,7 @@ class KeepalivedLvs(lvs_listener_base.LvsListenerApiServerBase):
Gets the status of all UDP listeners on the amphora.
"""
listeners = list()
listeners = []
for lvs_listener in util.get_lvs_listeners():
status = self._check_lvs_listener_status(lvs_listener)

View File

@ -82,7 +82,7 @@ class Loadbalancer(object):
:param listener_id: the id of the listener
"""
self._check_lb_exists(lb_id)
with open(util.config_path(lb_id), 'r') as file:
with open(util.config_path(lb_id), 'r', encoding='utf-8') as file:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
resp.headers['ETag'] = (
@ -365,7 +365,7 @@ class Loadbalancer(object):
Currently type==SSL is also not detected
"""
listeners = list()
listeners = []
for lb in util.get_loadbalancers():
stats_socket, listeners_on_lb = util.parse_haproxy_file(lb)
@ -414,7 +414,7 @@ class Loadbalancer(object):
details="No certificate with filename: {f}".format(
f=filename)), status=404)
with open(cert_path, 'r') as crt_file:
with open(cert_path, 'r', encoding='utf-8') as crt_file:
cert = crt_file.read()
md5sum = md5(octavia_utils.b(cert),
usedforsecurity=False).hexdigest() # nosec
@ -433,7 +433,8 @@ class Loadbalancer(object):
if os.path.exists(
os.path.join('/proc', util.get_haproxy_pid(lb_id))):
# Check if the listener is disabled
with open(util.config_path(lb_id), 'r') as file:
with open(util.config_path(lb_id),
'r', encoding='utf-8') as file:
cfg = file.read()
m = re.findall('^frontend (.*)$', cfg, re.MULTILINE)
return m or []

View File

@ -116,13 +116,13 @@ def config_path(lb_id):
def get_haproxy_pid(lb_id):
with open(pid_path(lb_id), 'r') as f:
with open(pid_path(lb_id), 'r', encoding='utf-8') as f:
return f.readline().rstrip()
def get_keepalivedlvs_pid(listener_id):
pid_file = keepalived_lvs_pids_path(listener_id)[0]
with open(pid_file, 'r') as f:
with open(pid_file, 'r', encoding='utf-8') as f:
return f.readline().rstrip()
@ -224,7 +224,8 @@ def is_lvs_listener_running(listener_id):
def get_os_init_system():
if os.path.exists(consts.INIT_PROC_COMM_PATH):
with open(consts.INIT_PROC_COMM_PATH, 'r') as init_comm:
with open(consts.INIT_PROC_COMM_PATH,
'r', encoding='utf-8') as init_comm:
init_proc_name = init_comm.read().rstrip('\n')
if init_proc_name == consts.INIT_SYSTEMD:
return consts.INIT_SYSTEMD
@ -293,7 +294,7 @@ def get_backend_for_lb_object(object_id):
def parse_haproxy_file(lb_id):
with open(config_path(lb_id), 'r') as file:
with open(config_path(lb_id), 'r', encoding='utf-8') as file:
cfg = file.read()
listeners = {}
@ -345,7 +346,8 @@ def vrrp_check_script_update(lb_id, action):
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not lvs_ids:
with open(haproxy_check_script_path(), 'w') as text_file:
with open(haproxy_check_script_path(),
'w', encoding='utf-8') as text_file:
text_file.write('exit 1')
else:
try:
@ -362,7 +364,7 @@ def vrrp_check_script_update(lb_id, action):
args.append(haproxy_sock_path(lbid))
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(haproxy_check_script_path(), 'w') as text_file:
with open(haproxy_check_script_path(), 'w', encoding='utf-8') as text_file:
text_file.write(cmd)
@ -373,7 +375,7 @@ def get_haproxy_vip_addresses(lb_id):
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
with open(config_path(lb_id), 'r') as file:
with open(config_path(lb_id), 'r', encoding='utf-8') as file:
for line in file:
current_line = line.strip()
if current_line.startswith('bind'):

View File

@ -118,7 +118,8 @@ def run_sender(cmd_queue):
# heartbeat
if os.path.isfile(keepalived_cfg_path):
# Is there a pid file for keepalived?
with open(keepalived_pid_path, 'r') as pid_file:
with open(keepalived_pid_path,
'r', encoding='utf-8') as pid_file:
pid = int(pid_file.readline())
os.kill(pid, 0)
@ -251,7 +252,7 @@ def build_stats_message():
pool_status = (
keepalivedlvs_query.get_lvs_listener_pool_status(
listener_id))
lvs_listener_dict = dict()
lvs_listener_dict = {}
lvs_listener_dict['status'] = listener_stats['status']
lvs_listener_dict['stats'] = {
'tx': delta_values['bout'],

View File

@ -53,7 +53,7 @@ class HAProxyQuery(object):
try:
sock.send(octavia_utils.b(query + '\n'))
data = u''
data = ''
while True:
x = sock.recv(1024)
if not x:
@ -68,7 +68,7 @@ class HAProxyQuery(object):
"""Get and parse output from 'show info' command."""
results = self._query('show info')
dict_results = dict()
dict_results = {}
for r in results.split('\n'):
vals = r.split(":", 1)
dict_results[vals[0].strip()] = vals[1].strip()

View File

@ -147,7 +147,8 @@ def get_lvs_listener_resource_ipports_nsname(listener_id):
# 'ipport': ipport}],
# 'HealthMonitor': {'id': healthmonitor-id}}
resource_ipport_mapping = {}
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
with open(util.keepalived_lvs_cfg_path(listener_id),
'r', encoding='utf-8') as f:
cfg = f.read()
ns_name = NS_REGEX.findall(cfg)[0]
listener_ip_port = V4_VS_REGEX.findall(cfg)
@ -255,7 +256,8 @@ def get_lvs_listener_pool_status(listener_id):
# updating to a recent keepalived release.
restarting = config_stat.st_mtime > check_pid_stat.st_mtime
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
with open(util.keepalived_lvs_cfg_path(listener_id),
'r', encoding='utf-8') as f:
cfg = f.read()
hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0
@ -323,7 +325,7 @@ def get_ipvsadm_info(ns_name, is_stats_cmd=False):
# mapping = {'listeneripport': {'Linstener': vs_values,
# 'members': [rs_values1, rs_values2]}}
last_key = None
value_mapping = dict()
value_mapping = {}
output_line_num = len(output)
def split_line(line):
@ -382,8 +384,8 @@ def get_lvs_listeners_stats():
need_check_listener_ids = [
listener_id for listener_id in lvs_listener_ids
if util.is_lvs_listener_running(listener_id)]
ipport_mapping = dict()
listener_stats_res = dict()
ipport_mapping = {}
listener_stats_res = {}
for check_listener_id in need_check_listener_ids:
# resource_ipport_mapping = {'Listener': {'id': listener-id,
# 'ipport': ipport},

View File

@ -41,8 +41,8 @@ class NetworkNamespace(object):
def __enter__(self):
# Save the current network namespace
# pylint: disable=consider-using-with
self.current_netns_fd = open(self.current_netns)
with open(self.target_netns) as fd:
self.current_netns_fd = open(self.current_netns, encoding='utf-8')
with open(self.target_netns, encoding='utf-8') as fd:
self.set_netns(fd.fileno(), self.CLONE_NEWNET)
def __exit__(self, *args):

View File

@ -513,7 +513,7 @@ class HaproxyAmphoraLoadBalancerDriver(
# 'client_cert': client_full_filename,
# 'ca_cert': ca_cert_full_filename,
# 'crl': crl_full_filename}}
pool_certs_dict = dict()
pool_certs_dict = {}
for pool in listener.pools:
if pool.id not in pool_certs_dict:
pool_certs_dict[pool.id] = self._process_pool_certs(
@ -527,7 +527,7 @@ class HaproxyAmphoraLoadBalancerDriver(
return pool_certs_dict
def _process_pool_certs(self, listener, pool, amphora, obj_id):
pool_cert_dict = dict()
pool_cert_dict = {}
# Handle the client cert(s) and key
if pool.tls_certificate_id:

View File

@ -111,7 +111,7 @@ class LocalCertManager(cert_mgr.CertManager):
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
cert_data = dict()
cert_data = {}
flags = os.O_RDONLY
try:

View File

@ -224,7 +224,7 @@ def sctp_health_check(ip_address, port, timeout=2):
if send_abort:
has_sctp_support = False
with open("/proc/net/protocols") as fp:
with open("/proc/net/protocols", encoding='utf-8') as fp:
for line in fp:
if line.startswith('SCTP'):
has_sctp_support = True

View File

@ -158,7 +158,8 @@ class CertComputeCreate(ComputeCreate):
"""
# load client certificate
with open(CONF.controller_worker.client_ca, 'r') as client_ca:
with open(CONF.controller_worker.client_ca,
'r', encoding='utf-8') as client_ca:
ca = client_ca.read()
key = utils.get_compatible_server_certs_key_passphrase()

View File

@ -165,7 +165,8 @@ class CertComputeCreate(ComputeCreate):
"""
# load client certificate
with open(CONF.controller_worker.client_ca, 'r') as client_ca:
with open(CONF.controller_worker.client_ca,
'r', encoding='utf-8') as client_ca:
ca = client_ca.read()
key = utils.get_compatible_server_certs_key_passphrase()