Fix some new pylint errors

Also squelch warnings about f-string usage for now (unless folks think
we should do that entire refactor here).

Change-Id: I193abaebdaca10d353e2dbaa14df110048ffaa58
This commit is contained in:
Adam Harwell 2021-09-16 21:46:37 -07:00
parent b8ddbc94a1
commit ba5b4a7fb5
5 changed files with 31 additions and 27 deletions

View File

@ -39,6 +39,7 @@ disable=
invalid-name,
line-too-long,
missing-docstring,
consider-using-f-string,
# "R" Refactor recommendations
duplicate-code,
interface-not-implemented,

View File

@ -13,8 +13,7 @@
# under the License.
import fcntl
import socket
from struct import pack
from struct import unpack
import struct
from oslo_log import log as logging
@ -51,11 +50,11 @@ def garp(interface, ip_address, net_ns=None):
source_mac = garp_socket.getsockname()[4]
garp_msg = [
pack('!h', 1), # Hardware type ethernet
pack('!h', 0x0800), # Protocol type IPv4
pack('!B', 6), # Hardware size
pack('!B', 4), # Protocol size
pack('!h', 1), # Opcode request
struct.pack('!h', 1), # Hardware type ethernet
struct.pack('!h', 0x0800), # Protocol type IPv4
struct.pack('!B', 6), # Hardware size
struct.pack('!B', 4), # Protocol size
struct.pack('!h', 1), # Opcode request
source_mac, # Sender MAC address
socket.inet_aton(ip_address), # Sender IP address
BROADCAST_MAC, # Target MAC address
@ -64,7 +63,7 @@ def garp(interface, ip_address, net_ns=None):
garp_ethernet = [
BROADCAST_MAC, # Ethernet destination
source_mac, # Ethernet source
pack('!h', ARP_ETHERTYPE), # Ethernet type
struct.pack('!h', ARP_ETHERTYPE), # Ethernet type
b''.join(garp_msg)] # The GARP message
garp_socket.send(b''.join(garp_ethernet))
@ -81,7 +80,7 @@ def calculate_icmpv6_checksum(packet):
# Add up 16-bit words
num_words = len(packet) // 2
for chunk in unpack("!%sH" % num_words, packet[0:num_words * 2]):
for chunk in struct.unpack("!%sH" % num_words, packet[0:num_words * 2]):
total += chunk
# Add any left over byte
@ -124,32 +123,33 @@ def neighbor_advertisement(interface, ip_address, net_ns=None):
# Get the byte representation of the MAC address of the interface
# Note: You can't use getsockname() to get the MAC on this type of socket
source_mac = fcntl.ioctl(na_socket.fileno(), SIOCGIFHWADDR, pack('256s',
bytes(interface, 'utf-8')))[18:24]
source_mac = fcntl.ioctl(
na_socket.fileno(), SIOCGIFHWADDR, struct.pack(
'256s', bytes(interface, 'utf-8')))[18:24]
# Get the byte representation of the source IP address
source_ip_bytes = socket.inet_pton(socket.AF_INET6, ip_address)
icmpv6_na_msg_prefix = [
pack('!B', 136), # ICMP Type Neighbor Advertisement
pack('!B', 0)] # ICMP Code
struct.pack('!B', 136), # ICMP Type Neighbor Advertisement
struct.pack('!B', 0)] # ICMP Code
icmpv6_na_msg_postfix = [
pack('!I', 0xa0000000), # Flags (Router, Override)
struct.pack('!I', 0xa0000000), # Flags (Router, Override)
source_ip_bytes, # Target address
pack('!B', 2), # ICMPv6 option type target link-layer address
pack('!B', 1), # ICMPv6 option length
struct.pack('!B', 2), # ICMPv6 option type target link-layer address
struct.pack('!B', 1), # ICMPv6 option length
source_mac] # ICMPv6 option link-layer address
# Calculate the ICMPv6 checksum
icmpv6_pseudo_header = [
source_ip_bytes, # Source IP address
source_ip_bytes, # Source IP address
socket.inet_pton(socket.AF_INET6, ALL_NODES_ADDR), # Destination IP
pack('!I', 58), # IPv6 next header (ICMPv6)
pack('!h', 32)] # IPv6 payload length
icmpv6_tmp_chksum = pack('!H', 0) # Checksum are zeros for calculation
struct.pack('!I', 58), # IPv6 next header (ICMPv6)
struct.pack('!h', 32)] # IPv6 payload length
icmpv6_tmp_chksum = struct.pack('!H', 0) # Checksum->zeros for calculation
tmp_chksum_msg = b''.join(icmpv6_pseudo_header + icmpv6_na_msg_prefix +
[icmpv6_tmp_chksum] + icmpv6_pseudo_header)
checksum = pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))
checksum = struct.pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))
# Build the ICMPv6 unsolicitated neighbor advertisement
icmpv6_msg = b''.join(icmpv6_na_msg_prefix + [checksum] +

View File

@ -455,8 +455,8 @@ class ListenersController(base.BaseController):
constants.PROTOCOL_TERMINATED_HTTPS)
# Make sure we have a client CA cert if they enable client auth
if ((listener.client_authentication != wtypes.Unset and
listener.client_authentication != constants.CLIENT_AUTH_NONE) and
if (listener.client_authentication not in
(wtypes.Unset, constants.CLIENT_AUTH_NONE) and
not (db_listener.client_ca_tls_certificate_id or
listener.client_ca_tls_container_ref)):
raise exceptions.ValidationException(detail=_(

View File

@ -42,7 +42,7 @@ def url(url, require_scheme=True):
raise exceptions.InvalidURL(url=url)
p_url = rfc3986.urlparse(rfc3986.normalize_uri(url))
if require_scheme:
if p_url.scheme != 'http' and p_url.scheme != 'https':
if p_url.scheme not in ('http', 'https'):
raise exceptions.InvalidURL(url=url)
except Exception as e:
raise exceptions.InvalidURL(url=url) from e

View File

@ -1077,9 +1077,12 @@ class ControllerWorker(object):
else:
flavor = {constants.LOADBALANCER_TOPOLOGY: lb.topology}
provider_lb_dict = (
provider_utils.db_loadbalancer_to_provider_loadbalancer(
lb).to_dict() if lb else lb)
if lb:
provider_lb_dict = (
provider_utils.db_loadbalancer_to_provider_loadbalancer(
lb).to_dict())
else:
provider_lb_dict = lb
provider_lb_dict[constants.FLAVOR] = flavor