Remove all usage of six library

Convert all code to not require six library and instead
use python 3.x logic.

Created one helper method in common.utils for binary
representation to limit code changes.

Change-Id: I2716ce93691d11100ee951a3a3f491329a4073f0
This commit is contained in:
Brian Haley 2020-01-06 14:17:55 -05:00
parent e0d4c98556
commit f6b957e8ee
70 changed files with 229 additions and 451 deletions

View File

@ -15,10 +15,4 @@
import gettext
import six
if six.PY2:
gettext.install('octavia', # pylint: disable=unexpected-keyword-arg
unicode=1)
else:
gettext.install('octavia')

View File

@ -19,7 +19,6 @@ import socket
import subprocess
import pyroute2
import six
import webob
import netifaces
@ -178,7 +177,7 @@ class AmphoraInfo(object):
def get_interface(self, ip_addr):
try:
ip_version = ipaddress.ip_address(six.text_type(ip_addr)).version
ip_version = ipaddress.ip_address(ip_addr).version
except Exception:
return webob.Response(
json=dict(message="Invalid IP address"), status=400)

View File

@ -24,7 +24,6 @@ import flask
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
import six
import webob
from werkzeug import exceptions
@ -86,7 +85,8 @@ class Loadbalancer(object):
with open(util.config_path(lb_id), 'r') as file:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
resp.headers['ETag'] = hashlib.md5(six.b(cfg)).hexdigest() # nosec
resp.headers['ETag'] = (
hashlib.md5(octavia_utils.b(cfg)).hexdigest()) # nosec
return resp
def upload_haproxy_config(self, amphora_id, lb_id):
@ -408,7 +408,7 @@ class Loadbalancer(object):
with open(cert_path, 'r') as crt_file:
cert = crt_file.read()
md5 = hashlib.md5(six.b(cert)).hexdigest() # nosec
md5 = hashlib.md5(octavia_utils.b(cert)).hexdigest() # nosec
resp = webob.Response(json=dict(md5sum=md5))
resp.headers['ETag'] = md5
return resp

View File

@ -23,7 +23,6 @@ import distro
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
import six
import webob
from werkzeug import exceptions
@ -162,11 +161,8 @@ class BaseOS(object):
try:
ip_addr = fixed_ip['ip_address']
cidr = fixed_ip['subnet_cidr']
ip = ipaddress.ip_address(ip_addr if isinstance(
ip_addr, six.text_type) else six.u(ip_addr))
network = ipaddress.ip_network(
cidr if isinstance(
cidr, six.text_type) else six.u(cidr))
ip = ipaddress.ip_address(ip_addr)
network = ipaddress.ip_network(cidr)
broadcast = network.broadcast_address.exploded
netmask = (network.prefixlen if ip.version == 6
else network.netmask.exploded)
@ -189,10 +185,7 @@ class BaseOS(object):
def get_host_routes(cls, fixed_ip):
host_routes = []
for hr in fixed_ip.get('host_routes', []):
network = ipaddress.ip_network(
hr['destination'] if isinstance(
hr['destination'], six.text_type) else
six.u(hr['destination']))
network = ipaddress.ip_network(hr['destination'])
host_routes.append({'network': network, 'gw': hr['nexthop']})
return host_routes
@ -502,8 +495,7 @@ class RH(BaseOS):
host_routes_ipv6 = []
for fixed_ip in fixed_ips:
ip_addr = fixed_ip['ip_address']
ip = ipaddress.ip_address(ip_addr if isinstance(
ip_addr, six.text_type) else six.u(ip_addr))
ip = ipaddress.ip_address(ip_addr)
if ip.version == 6:
host_routes_ipv6.extend(self.get_host_routes(fixed_ip))
else:

View File

@ -22,7 +22,6 @@ import subprocess
from oslo_config import cfg
from oslo_log import log as logging
import pyroute2
import six
import webob
from werkzeug import exceptions
@ -46,28 +45,19 @@ class Plug(object):
# Validate vip and subnet_cidr, calculate broadcast address and netmask
try:
render_host_routes = []
ip = ipaddress.ip_address(
vip if isinstance(vip, six.text_type) else six.u(vip))
network = ipaddress.ip_network(
subnet_cidr if isinstance(subnet_cidr, six.text_type)
else six.u(subnet_cidr))
ip = ipaddress.ip_address(vip)
network = ipaddress.ip_network(subnet_cidr)
vip = ip.exploded
broadcast = network.broadcast_address.exploded
netmask = (network.prefixlen if ip.version == 6
else network.netmask.exploded)
vrrp_version = None
if vrrp_ip:
vrrp_ip_obj = ipaddress.ip_address(
vrrp_ip if isinstance(vrrp_ip, six.text_type)
else six.u(vrrp_ip)
)
vrrp_ip_obj = ipaddress.ip_address(vrrp_ip)
vrrp_version = vrrp_ip_obj.version
if host_routes:
for hr in host_routes:
network = ipaddress.ip_network(
hr['destination'] if isinstance(
hr['destination'], six.text_type) else
six.u(hr['destination']))
network = ipaddress.ip_network(hr['destination'])
render_host_routes.append({'network': network,
'gw': hr['nexthop']})
except ValueError:

View File

@ -18,7 +18,6 @@ import stat
import flask
from oslo_config import cfg
from oslo_log import log as logging
import six
import webob
from werkzeug import exceptions
@ -47,7 +46,7 @@ def make_json_error(ex):
def register_app_error_handler(app):
for code in six.iterkeys(exceptions.default_exceptions):
for code in exceptions.default_exceptions:
app.register_error_handler(code, make_json_error)

View File

@ -15,8 +15,6 @@
import abc
import six
from oslo_config import cfg
from stevedore import driver as stevedore_driver
@ -24,8 +22,7 @@ CONF = cfg.CONF
UDP_SERVER_NAMESPACE = 'octavia.amphora.udp_api_server'
@six.add_metaclass(abc.ABCMeta)
class UdpListenerApiServerBase(object):
class UdpListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""Base UDP Listener Server API
"""

View File

@ -16,11 +16,11 @@
import errno
import os
import queue
import time
from oslo_config import cfg
from oslo_log import log as logging
import six
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.health_daemon import health_sender
@ -28,12 +28,6 @@ from octavia.amphorae.backends.utils import haproxy_query
from octavia.amphorae.backends.utils import keepalivedlvs_query
if six.PY2:
import Queue as queue # pylint: disable=wrong-import-order
else:
import queue # pylint: disable=wrong-import-order
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
SEQ = 0

View File

@ -15,9 +15,8 @@
import csv
import socket
import six
from octavia.common import constants as consts
from octavia.common import utils as octavia_utils
from octavia.i18n import _
@ -52,14 +51,14 @@ class HAProxyQuery(object):
raise Exception(_("HAProxy '{0}' query failed.").format(query))
try:
sock.send(six.b(query + '\n'))
sock.send(octavia_utils.b(query + '\n'))
data = u''
while True:
x = sock.recv(1024)
if not x:
break
data += x.decode('ascii') if (
isinstance(x, six.binary_type)) else x
isinstance(x, bytes)) else x
return data.rstrip()
finally:
sock.close()

View File

@ -15,7 +15,6 @@ import re
import subprocess
from oslo_log import log as logging
import six
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants
@ -70,7 +69,7 @@ def get_listener_realserver_mapping(ns_name, listener_ip_port,
# 'InActConn': 0
# }}
listener_ip, listener_port = listener_ip_port.rsplit(':', 1)
ip_obj = ipaddress.ip_address(six.text_type(listener_ip.strip('[]')))
ip_obj = ipaddress.ip_address(listener_ip.strip('[]'))
output = read_kernel_file(ns_name, KERNEL_LVS_PATH).split('\n')
if ip_obj.version == 4:
ip_to_hex_format = "0%X" % ip_obj._ip
@ -195,7 +194,7 @@ def get_udp_listener_resource_ipports_nsname(listener_id):
rs_ip_port_count = len(rs_ip_port_list)
for index in range(rs_ip_port_count):
if ipaddress.ip_address(
six.text_type(rs_ip_port_list[index][0])).version == 6:
rs_ip_port_list[index][0]).version == 6:
rs_ip_port_list[index] = (
'[' + rs_ip_port_list[index][0] + ']',
rs_ip_port_list[index][1])
@ -204,7 +203,7 @@ def get_udp_listener_resource_ipports_nsname(listener_id):
rs_ip_port_list[index][1])
if ipaddress.ip_address(
six.text_type(listener_ip_port[0])).version == 6:
listener_ip_port[0]).version == 6:
listener_ip_port = (
'[' + listener_ip_port[0] + ']', listener_ip_port[1])
resource_ipport_mapping['Listener']['ipport'] = (

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from oslo_utils import excutils
from octavia.i18n import _
@ -35,7 +33,7 @@ class AmphoraDriverError(Exception):
super(AmphoraDriverError, self).__init__(self.message)
def __unicode__(self):
return six.text_type(self.msg)
return self.msg
@staticmethod
def use_fatal_exceptions():

View File

@ -15,12 +15,8 @@
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AmphoraLoadBalancerDriver(object):
class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_amphora_listeners(self, loadbalancer, amphora,
timeout_dict):
@ -207,8 +203,7 @@ class AmphoraLoadBalancerDriver(object):
"""
@six.add_metaclass(abc.ABCMeta)
class HealthMixin(object):
class HealthMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_health(self, health):
"""Return ceilometer ready health
@ -229,8 +224,7 @@ class HealthMixin(object):
"""
@six.add_metaclass(abc.ABCMeta)
class StatsMixin(object):
class StatsMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_stats(self, stats):
"""Return ceilometer ready stats
@ -250,8 +244,7 @@ class StatsMixin(object):
"""
@six.add_metaclass(abc.ABCMeta)
class VRRPDriverMixin(object):
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
"""Abstract mixin class for VRRP support in loadbalancer amphorae
Usage: To plug VRRP support in another service driver XYZ, use:

View File

@ -23,7 +23,6 @@ from oslo_context import context as oslo_context
from oslo_log import log as logging
import requests
import simplejson
import six
from stevedore import driver as stevedore_driver
from octavia.amphorae.driver_exceptions import exceptions as driver_except
@ -664,7 +663,7 @@ class AmphoraAPIClientBase(object):
self.ssl_adapter.uuid = amp.id
exception = None
# Keep retrying
for dummy in six.moves.xrange(conn_max_retries):
for dummy in range(conn_max_retries):
try:
with warnings.catch_warnings():
warnings.filterwarnings(
@ -705,9 +704,8 @@ class AmphoraAPIClientBase(object):
# For taskflow persistence cause attribute should
# be serializable to JSON. Pass None, as cause exception
# is described in the expection message.
six.raise_from(
driver_except.AmpConnectionRetry(exception=str(e)),
None)
raise driver_except.AmpConnectionRetry(
exception=str(e)) from None
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",

View File

@ -17,7 +17,6 @@ import os
import jinja2
from oslo_config import cfg
import six
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants
@ -72,20 +71,17 @@ class KeepalivedJinjaTemplater(object):
# Validate the VIP address and see if it is IPv6
vip = loadbalancer.vip.ip_address
vip_addr = ipaddress.ip_address(
vip if isinstance(vip, six.text_type) else six.u(vip))
vip_addr = ipaddress.ip_address(vip)
vip_ipv6 = vip_addr.version == 6
# Normalize and validate the VIP subnet CIDR
vip_network_cidr = None
vip_cidr = (vip_cidr if isinstance(vip_cidr, six.text_type) else
six.u(vip_cidr))
if vip_ipv6:
vip_network_cidr = ipaddress.IPv6Network(vip_cidr).with_prefixlen
else:
vip_network_cidr = ipaddress.IPv4Network(vip_cidr).with_prefixlen
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
if amp.vrrp_ip != amphora.vrrp_ip:

View File

@ -13,7 +13,6 @@
# under the License.
from oslo_log import log as logging
import six
from octavia.amphorae.drivers import driver_base
from octavia.amphorae.drivers.keepalived.jinja import jinja_cfg
@ -41,7 +40,7 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -67,7 +66,7 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
LOG.info("Stop loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -82,7 +81,7 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
LOG.info("Start loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -98,7 +97,7 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
LOG.info("Reload loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):

View File

@ -15,18 +15,14 @@
import copy
import netaddr
import six
from wsme import types as wtypes
from octavia.common import exceptions
from octavia.common import validate
if six.PY3:
unicode = str
class IPAddressType(wtypes.UserType):
basetype = unicode
basetype = str
name = 'ipaddress'
@staticmethod
@ -45,7 +41,7 @@ class IPAddressType(wtypes.UserType):
class CidrType(wtypes.UserType):
basetype = unicode
basetype = str
name = 'cidr'
@staticmethod
@ -59,7 +55,7 @@ class CidrType(wtypes.UserType):
class URLType(wtypes.UserType):
basetype = unicode
basetype = str
name = 'url'
def __init__(self, require_scheme=True):
@ -76,7 +72,7 @@ class URLType(wtypes.UserType):
class URLPathType(wtypes.UserType):
basetype = unicode
basetype = str
name = 'url_path'
@staticmethod
@ -119,8 +115,7 @@ class BaseMeta(wtypes.BaseMeta):
return super(BaseMeta, cls).__new__(cls, name, bases, dct)
@six.add_metaclass(BaseMeta)
class BaseType(wtypes.Base):
class BaseType(wtypes.Base, metaclass=BaseMeta):
@classmethod
def _full_response(cls):
return False

View File

@ -53,7 +53,7 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
topic=consts.TOPIC_AMPHORA_V2, version="2.0", fanout=False)
self.client = rpc.get_client(self.target)
self.repositories = repositories.Repositories()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
self.fernet = fernet.Fernet(key)
def _validate_pool_algorithm(self, pool):

View File

@ -15,10 +15,9 @@
import errno
import os
import socketserver
import threading
import six.moves.socketserver as socketserver
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils

View File

@ -14,8 +14,6 @@
import copy
import six
from octavia_lib.api.drivers import data_models as driver_dm
from octavia_lib.api.drivers import exceptions as lib_exceptions
from oslo_config import cfg
@ -255,7 +253,7 @@ def listener_dict_to_provider_dict(listener_dict, for_delete=False):
del sni['listener']
sni_obj = data_models.SNI(**sni)
SNI_objs.append(sni_obj)
elif isinstance(sni, six.string_types):
elif isinstance(sni, str):
sni_obj = data_models.SNI(tls_container_id=sni)
SNI_objs.append(sni_obj)
else:

View File

@ -15,7 +15,6 @@
from oslo_config import cfg
from oslo_log import log as logging
import pecan
import six
from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
@ -49,7 +48,7 @@ class ProviderController(base.BaseController):
enabled_providers = CONF.api_settings.enabled_provider_drivers
response_list = [
provider_types.ProviderResponse(name=key, description=value) for
key, value in six.iteritems(enabled_providers)]
key, value in enabled_providers.items()]
if fields is not None:
response_list = self._filter_fields(response_list, fields)
return provider_types.ProvidersRootResponse(providers=response_list)
@ -106,16 +105,16 @@ class FlavorCapabilitiesController(base.BaseController):
constants.DESCRIPTION)
if name_filter:
metadata_dict = {
key: value for key, value in six.iteritems(metadata_dict) if
key: value for key, value in metadata_dict.items() if
key == name_filter}
if description_filter:
metadata_dict = {
key: value for key, value in six.iteritems(metadata_dict) if
key: value for key, value in metadata_dict.items() if
value == description_filter}
response_list = [
provider_types.ProviderResponse(name=key, description=value) for
key, value in six.iteritems(metadata_dict)]
key, value in metadata_dict.items()]
if fields is not None:
response_list = self._filter_fields(response_list, fields)
return provider_types.FlavorCapabilitiesResponse(
@ -158,16 +157,16 @@ class AvailabilityZoneCapabilitiesController(base.BaseController):
constants.DESCRIPTION)
if name_filter:
metadata_dict = {
key: value for key, value in six.iteritems(metadata_dict) if
key: value for key, value in metadata_dict.items() if
key == name_filter}
if description_filter:
metadata_dict = {
key: value for key, value in six.iteritems(metadata_dict) if
key: value for key, value in metadata_dict.items() if
value == description_filter}
response_list = [
provider_types.ProviderResponse(name=key, description=value) for
key, value in six.iteritems(metadata_dict)]
key, value in metadata_dict.items()]
if fields is not None:
response_list = self._filter_fields(response_list, fields)
return provider_types.AvailabilityZoneCapabilitiesResponse(

View File

@ -21,7 +21,6 @@ import abc
from barbicanclient.v1 import containers
from oslo_utils import encodeutils
import six
from octavia.certificates.common import cert
from octavia.common.tls_utils import cert_parser
@ -62,8 +61,7 @@ class BarbicanCert(cert.Cert):
return None
@six.add_metaclass(abc.ABCMeta)
class BarbicanAuth(object):
class BarbicanAuth(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_barbican_client(self, project_id):
"""Creates a Barbican client object.

View File

@ -15,11 +15,8 @@
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Cert(object):
class Cert(object, metaclass=abc.ABCMeta):
"""Base class to represent all certificates."""
@abc.abstractmethod

View File

@ -18,11 +18,8 @@ Certificate Generator API
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class CertGenerator(object):
class CertGenerator(object, metaclass=abc.ABCMeta):
"""Base Cert Generator Interface
A Certificate Generator is responsible for generating private keys,

View File

@ -23,7 +23,6 @@ from cryptography.hazmat.primitives import serialization
from cryptography import x509
from oslo_config import cfg
from oslo_log import log as logging
import six
from octavia.certificates.common import local as local_common
from octavia.certificates.generator import cert_gen
@ -186,7 +185,6 @@ class LocalCertGenerator(cert_gen.CertGenerator):
@classmethod
def _generate_csr(cls, cn, private_key, passphrase=None):
cn = six.text_type(cn)
pk = serialization.load_pem_private_key(
data=private_key, password=passphrase,
backend=backends.default_backend())

View File

@ -18,11 +18,8 @@ Certificate manager API
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class CertManager(object):
class CertManager(object, metaclass=abc.ABCMeta):
"""Base Cert Manager Interface
A Cert Manager is responsible for managing certificates for TLS.

View File

@ -18,7 +18,6 @@ import uuid
from oslo_config import cfg
from oslo_log import log as logging
import six
from octavia.certificates.common import local as local_common
from octavia.certificates.manager import cert_mgr
@ -51,9 +50,9 @@ class LocalCertManager(cert_mgr.CertManager):
"""
cert_ref = str(uuid.uuid4())
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
if type(certificate) == six.binary_type:
if type(certificate) == bytes:
certificate = certificate.decode('utf-8')
if type(private_key) == six.binary_type:
if type(private_key) == bytes:
private_key = private_key.decode('utf-8')
LOG.info("Storing certificate data on the local filesystem.")
@ -72,7 +71,7 @@ class LocalCertManager(cert_mgr.CertManager):
if intermediates:
filename_intermediates = "{0}.int".format(filename_base)
if type(intermediates) == six.binary_type:
if type(intermediates) == bytes:
intermediates = intermediates.decode('utf-8')
with os.fdopen(os.open(
filename_intermediates, flags, mode), 'w') as int_file:
@ -80,7 +79,7 @@ class LocalCertManager(cert_mgr.CertManager):
if private_key_passphrase:
filename_pkp = "{0}.pass".format(filename_base)
if type(private_key_passphrase) == six.binary_type:
if type(private_key_passphrase) == bytes:
private_key_passphrase = private_key_passphrase.decode(
'utf-8')
with os.fdopen(os.open(

View File

@ -16,7 +16,6 @@
import re
import six
from sqlalchemy.orm import collections
from octavia.common import constants
@ -63,8 +62,6 @@ class BaseDataModel(object):
recurse=recurse)
else:
ret[attr] = None
elif six.PY2 and isinstance(value, six.text_type):
ret[attr.encode('utf8')] = value.encode('utf8')
else:
ret[attr] = value
else:

View File

@ -17,8 +17,6 @@
Octavia base exception handling.
"""
import six
from oslo_utils import excutils
from webob import exc
@ -52,7 +50,7 @@ class OctaviaException(Exception):
super(OctaviaException, self).__init__(self.message)
def __unicode__(self):
return six.text_type(self.msg)
return self.msg
@staticmethod
def use_fatal_exceptions():

View File

@ -16,7 +16,6 @@ import os
import re
import jinja2
import six
from octavia.common.config import cfg
from octavia.common import constants
@ -492,7 +491,7 @@ class JinjaTemplater(object):
if '-' in code:
low, hi = code.split('-')[:2]
retval.update(
str(i) for i in six.moves.xrange(int(low), int(hi) + 1))
str(i) for i in range(int(low), int(hi) + 1))
else:
retval.add(code)
return sorted(retval)

View File

@ -16,7 +16,6 @@ import os
import re
import jinja2
import six
from octavia.common.config import cfg
from octavia.common import constants
@ -482,7 +481,7 @@ class JinjaTemplater(object):
if '-' in code:
low, hi = code.split('-')[:2]
retval.update(
str(i) for i in six.moves.xrange(int(low), int(hi) + 1))
str(i) for i in range(int(low), int(hi) + 1))
else:
retval.add(code)
return sorted(retval)

View File

@ -24,10 +24,10 @@ from oslo_log import log as logging
from pyasn1.codec.der import decoder as der_decoder
from pyasn1.codec.der import encoder as der_encoder
from pyasn1_modules import rfc2315
import six
from octavia.common import data_models
from octavia.common import exceptions
from octavia.common import utils as octavia_utils
X509_BEG = b'-----BEGIN CERTIFICATE-----'
X509_END = b'-----END CERTIFICATE-----'
@ -72,9 +72,9 @@ def _read_private_key(private_key_pem, passphrase=None):
:param passphrase: Optional passphrase needed to decrypt the private key
:returns: a RSAPrivatekey object
"""
if passphrase and type(passphrase) == six.text_type:
if passphrase and isinstance(passphrase, str):
passphrase = passphrase.encode("utf-8")
if type(private_key_pem) == six.text_type:
if isinstance(private_key_pem, str):
private_key_pem = private_key_pem.encode('utf-8')
try:
@ -106,7 +106,7 @@ def get_intermediates_pems(intermediates=None):
X509 pem block surrounded by BEGIN CERTIFICATE,
END CERTIFICATE block tags
"""
if isinstance(intermediates, six.string_types):
if isinstance(intermediates, str):
try:
intermediates = intermediates.encode("utf-8")
except UnicodeDecodeError:
@ -139,13 +139,13 @@ def _split_x509s(xstr):
"""
curr_pem_block = []
inside_x509 = False
if type(xstr) == six.binary_type:
if isinstance(xstr, bytes):
xstr = xstr.decode('utf-8')
for line in xstr.replace("\r", "").split("\n"):
if inside_x509:
curr_pem_block.append(line)
if line == X509_END.decode('utf-8'):
yield six.b("\n".join(curr_pem_block))
yield octavia_utils.b("\n".join(curr_pem_block))
curr_pem_block = []
inside_x509 = False
continue
@ -193,7 +193,7 @@ def _read_pem_blocks(data):
stopMarkers = {PKCS7_END.decode('utf-8'): 0}
idx = -1
state = stSpam
if type(data) == six.binary_type:
if isinstance(data, bytes):
data = data.decode('utf-8')
for certLine in data.replace('\r', '').split('\n'):
if not certLine:
@ -254,7 +254,7 @@ def get_host_names(certificate):
certificate, and 'dns_names' is a list of dNSNames
(possibly empty) from the SubjectAltNames of the certificate.
"""
if isinstance(certificate, six.string_types):
if isinstance(certificate, str):
certificate = certificate.encode('utf-8')
try:
cert = x509.load_pem_x509_certificate(certificate,
@ -301,7 +301,7 @@ def _get_x509_from_pem_bytes(certificate_pem):
:param certificate_pem: Certificate in PEM format
:returns: crypto high-level x509 data from the PEM string
"""
if type(certificate_pem) == six.text_type:
if isinstance(certificate_pem, str):
certificate_pem = certificate_pem.encode('utf-8')
try:
x509cert = x509.load_pem_x509_certificate(certificate_pem,
@ -386,15 +386,15 @@ def _map_cert_tls_container(cert):
private_key = cert.get_private_key()
private_key_passphrase = cert.get_private_key_passphrase()
intermediates = cert.get_intermediates()
if isinstance(certificate, six.string_types):
if isinstance(certificate, str):
certificate = certificate.encode('utf-8')
if isinstance(private_key, six.string_types):
if isinstance(private_key, str):
private_key = private_key.encode('utf-8')
if isinstance(private_key_passphrase, six.string_types):
if isinstance(private_key_passphrase, str):
private_key_passphrase = private_key_passphrase.encode('utf-8')
if intermediates:
intermediates = [
(imd.encode('utf-8') if isinstance(imd, six.string_types) else imd)
(imd.encode('utf-8') if isinstance(imd, str) else imd)
for imd in intermediates
]
else:

View File

@ -26,7 +26,6 @@ import netaddr
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import six
from stevedore import driver as stevedore_driver
CONF = cfg.CONF
@ -97,15 +96,15 @@ def ip_netmask_to_cidr(ip, netmask):
return "{ip}/{netmask}".format(ip=net.network, netmask=net.prefixlen)
def get_six_compatible_value(value, six_type=six.string_types):
if six.PY3 and isinstance(value, six_type):
def get_compatible_value(value):
if isinstance(value, str):
value = value.encode('utf-8')
return value
def get_six_compatible_server_certs_key_passphrase():
def get_compatible_server_certs_key_passphrase():
key = CONF.certificates.server_certs_key_passphrase
if six.PY3 and isinstance(key, six.string_types):
if isinstance(key, str):
key = key.encode('utf-8')
return base64.urlsafe_b64encode(key)
@ -117,6 +116,10 @@ def subnet_ip_availability(nw_ip_avail, subnet_id, req_num_ips):
return None
def b(s):
return s.encode('utf-8')
class exception_logger(object):
"""Wrap a function and log raised exception

View File

@ -25,7 +25,6 @@ import re
import netaddr
from oslo_config import cfg
import rfc3986
import six
from wsme import types as wtypes
from octavia.common import constants
@ -419,7 +418,7 @@ def check_session_persistence(SP_dict):
def ip_not_reserved(ip_address):
ip_address = (
ipaddress.ip_address(six.text_type(ip_address)).exploded.upper())
ipaddress.ip_address(ip_address).exploded.upper())
if ip_address in CONF.networking.reserved_ips:
raise exceptions.InvalidOption(value=ip_address,
option='member address')

View File

@ -14,11 +14,8 @@
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ComputeBase(object):
class ComputeBase(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def build(self, name="amphora_name", amphora_flavor=None,

View File

@ -16,7 +16,6 @@
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
@ -184,7 +183,7 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning("Reverting post network plug.")
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -234,7 +233,7 @@ class AmphoraCertUpload(BaseAmphoraTask):
def execute(self, amphora, server_pem):
"""Execute cert_update_amphora routine."""
LOG.debug("Upload cert in amphora REST driver")
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
self.amphora_driver.upload_cert_amp(amphora, fer.decrypt(server_pem))
@ -250,7 +249,7 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -278,7 +277,7 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning("Reverting Get Amphora VRRP Interface.")
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):

View File

@ -45,7 +45,7 @@ class GenerateServerPEMTask(BaseCertTask):
cert = self.cert_generator.generate_cert_key_pair(
cn=amphora_id,
validity=CONF.certificates.cert_validity_time)
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
return fer.encrypt(cert.certificate + cert.private_key)

View File

@ -160,7 +160,7 @@ class CertComputeCreate(ComputeCreate):
with open(CONF.controller_worker.client_ca, 'r') as client_ca:
ca = client_ca.read()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
config_drive_files = {
'/etc/octavia/certs/server.pem': fer.decrypt(server_pem),

View File

@ -19,7 +19,6 @@ from oslo_db import exception as odb_exceptions
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
import six
import sqlalchemy
from sqlalchemy.orm import exc
from taskflow import task
@ -936,7 +935,7 @@ class UpdateAmphoraDBCertExpiration(BaseDatabaseTask):
LOG.debug("Update DB cert expiry date of amphora id: %s", amphora_id)
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
cert_expiration = cert_parser.get_cert_expiration(
fer.decrypt(server_pem))
@ -2631,7 +2630,7 @@ class DecrementPoolQuota(BaseDatabaseTask):
# This is separate calls to maximize the correction
# should other factors have increased the in use quota
# before this point in the revert flow
for i in six.moves.range(pool_child_count['member']):
for i in range(pool_child_count['member']):
lock_session = db_apis.get_session(autocommit=False)
try:
self.repos.check_quota_met(session,

View File

@ -15,7 +15,6 @@
from oslo_config import cfg
from oslo_log import log as logging
import six
from taskflow import task
from taskflow.types import failure
@ -105,7 +104,7 @@ class CalculateDelta(BaseNetworkTask):
calculate_amp = CalculateAmphoraDelta()
deltas = {}
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
@ -274,7 +273,7 @@ class HandleNetworkDeltas(BaseNetworkTask):
def execute(self, deltas):
"""Handle network plugging based off deltas."""
added_ports = {}
for amp_id, delta in six.iteritems(deltas):
for amp_id, delta in deltas.items():
added_ports[amp_id] = []
for nic in delta.add_nics:
interface = self.network_driver.plug_network(delta.compute_id,
@ -300,7 +299,7 @@ class HandleNetworkDeltas(BaseNetworkTask):
if isinstance(result, failure.Failure):
return
for amp_id, delta in six.iteritems(deltas):
for amp_id, delta in deltas.items():
LOG.warning("Unable to plug networks for amp id %s",
delta.amphora_id)
if not delta:
@ -337,7 +336,7 @@ class PlugVIP(BaseNetworkTask):
try:
# Make sure we have the current port IDs for cleanup
for amp_data in result:
for amphora in six.moves.filter(
for amphora in filter(
# pylint: disable=cell-var-from-loop
lambda amp: amp.id == amp_data.id,
loadbalancer.amphorae):

View File

@ -16,7 +16,6 @@
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from taskflow import retry
from taskflow import task
@ -256,7 +255,7 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
db_lb = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
LOG.warning("Reverting post network plug.")
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
db_lb.amphorae):
@ -325,7 +324,7 @@ class AmphoraCertUpload(BaseAmphoraTask):
def execute(self, amphora, server_pem):
"""Execute cert_update_amphora routine."""
LOG.debug("Upload cert in amphora REST driver")
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
db_amp = self.amphora_repo.get(db_apis.get_session(),
id=amphora.get(constants.ID))
@ -346,7 +345,7 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
db_lb.amphorae):
@ -377,7 +376,7 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
LOG.warning("Reverting Get Amphora VRRP Interface.")
db_lb = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
for amp in six.moves.filter(
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
db_lb.amphorae):

View File

@ -45,7 +45,7 @@ class GenerateServerPEMTask(BaseCertTask):
cert = self.cert_generator.generate_cert_key_pair(
cn=amphora_id,
validity=CONF.certificates.cert_validity_time)
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
# storing in db requires conversion bytes to string

View File

@ -167,7 +167,7 @@ class CertComputeCreate(ComputeCreate):
with open(CONF.controller_worker.client_ca, 'r') as client_ca:
ca = client_ca.read()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
config_drive_files = {
'/etc/octavia/certs/server.pem': fer.decrypt(

View File

@ -19,7 +19,6 @@ from oslo_db import exception as odb_exceptions
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
import six
import sqlalchemy
from sqlalchemy.orm import exc
from taskflow import task
@ -999,7 +998,7 @@ class UpdateAmphoraDBCertExpiration(BaseDatabaseTask):
LOG.debug("Update DB cert expiry date of amphora id: %s", amphora_id)
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
cert_expiration = cert_parser.get_cert_expiration(
fer.decrypt(server_pem.encode("utf-8")))
@ -2818,7 +2817,7 @@ class DecrementPoolQuota(BaseDatabaseTask):
# This is separate calls to maximize the correction
# should other factors have increased the in use quota
# before this point in the revert flow
for i in six.moves.range(pool_child_count['member']):
for i in range(pool_child_count['member']):
lock_session = db_apis.get_session(autocommit=False)
try:
self.repos.check_quota_met(session,

View File

@ -15,7 +15,6 @@
from oslo_config import cfg
from oslo_log import log as logging
import six
from taskflow import task
from taskflow.types import failure
@ -117,7 +116,7 @@ class CalculateDelta(BaseNetworkTask):
deltas = {}
db_lb = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
db_lb.amphorae):
@ -294,7 +293,7 @@ class HandleNetworkDeltas(BaseNetworkTask):
def execute(self, deltas):
"""Handle network plugging based off deltas."""
added_ports = {}
for amp_id, delta in six.iteritems(deltas):
for amp_id, delta in deltas.items():
added_ports[amp_id] = []
for nic in delta[constants.ADD_NICS]:
interface = self.network_driver.plug_network(
@ -322,7 +321,7 @@ class HandleNetworkDeltas(BaseNetworkTask):
if isinstance(result, failure.Failure):
return
for amp_id, delta in six.iteritems(deltas):
for amp_id, delta in deltas.items():
LOG.warning("Unable to plug networks for amp id %s",
delta[constants.AMPHORA_ID])
if not delta:
@ -364,7 +363,7 @@ class PlugVIP(BaseNetworkTask):
try:
# Make sure we have the current port IDs for cleanup
for amp_data in result:
for amphora in six.moves.filter(
for amphora in filter(
# pylint: disable=cell-var-from-loop
lambda amp: amp.id == amp_data['id'],
db_lb.amphorae):
@ -549,7 +548,7 @@ class GetAmphoraNetworkConfigs(BaseNetworkTask):
db_configs = self.network_driver.get_network_configs(
db_lb, amphora=db_amp)
provider_dict = {}
for amp_id, amp_conf in six.iteritems(db_configs):
for amp_id, amp_conf in db_configs.items():
provider_dict[amp_id] = amp_conf.to_dict(recurse=True)
return provider_dict
@ -563,7 +562,7 @@ class GetAmphoraeNetworkConfigs(BaseNetworkTask):
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
db_configs = self.network_driver.get_network_configs(db_lb)
provider_dict = {}
for amp_id, amp_conf in six.iteritems(db_configs):
for amp_id, amp_conf in db_configs.items():
provider_dict[amp_id] = amp_conf.to_dict(recurse=True)
return provider_dict

View File

@ -15,15 +15,12 @@
import abc
import six
# This class describes the abstraction of a distributor interface.
# Distributor implementations may be: a noop, a single hardware device,
# a single amphora, or multiple amphora among other options.
@six.add_metaclass(abc.ABCMeta)
class DistributorDriver(object):
class DistributorDriver(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_create_distributor_subflow(self):
"""Get a subflow to create a distributor

View File

@ -57,13 +57,9 @@ assert_not_equal_end_with_none_re = re.compile(
r"(.)*assertNotEqual\(.+, None\)")
assert_not_equal_start_with_none_re = re.compile(
r"(.)*assertNotEqual\(None, .+\)")
assert_no_xrange_re = re.compile(
r"\s*xrange\s*\(")
revert_must_have_kwargs_re = re.compile(
r'[ ]*def revert\(.+,[ ](?!\*\*kwargs)\w+\):')
untranslated_exception_re = re.compile(r"raise (?:\w*)\((.*)\)")
no_basestring_re = re.compile(r"\bbasestring\b")
no_iteritems_re = re.compile(r".*\.iteritems\(\)")
no_eventlet_re = re.compile(r'(import|from)\s+[(]?eventlet')
no_line_continuation_backslash_re = re.compile(r'.*(\\)\n')
no_logging_re = re.compile(r'(import|from)\s+[(]?logging')
@ -143,15 +139,6 @@ def no_log_warn(logical_line):
yield(0, "O339:Use LOG.warning() rather than LOG.warn()")
def no_xrange(logical_line):
"""Disallow 'xrange()'
O340
"""
if assert_no_xrange_re.match(logical_line):
yield(0, "O340: Do not use xrange().")
def no_translate_logs(logical_line, filename):
"""O341 - Don't translate logs.
@ -198,37 +185,6 @@ def check_raised_localized_exceptions(logical_line, filename):
yield (logical_line.index(exception_msg), msg)
def check_no_basestring(logical_line):
"""O343 - basestring is not Python3-compatible.
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_basestring_re.search(logical_line):
msg = ("O343: basestring is not Python3-compatible, use "
"six.string_types instead.")
yield(0, msg)
def check_python3_no_iteritems(logical_line):
"""O344 - Use dict.items() instead of dict.iteritems().
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_iteritems_re.search(logical_line):
msg = ("O344: Use dict.items() instead of dict.iteritems() to be "
"compatible with both Python 2 and Python 3. In Python 2, "
"dict.items() may be inefficient for very large dictionaries. "
"If you can prove that you need the optimization of an "
"iterator for Python 2, then you can use six.iteritems(dict).")
yield(0, msg)
def check_no_eventlet_imports(logical_line):
"""O345 - Usage of Python eventlet module not allowed.
@ -297,10 +253,7 @@ def factory(register):
register(no_mutable_default_args)
register(assert_equal_in)
register(no_log_warn)
register(no_xrange)
register(check_raised_localized_exceptions)
register(check_no_basestring)
register(check_python3_no_iteritems)
register(check_no_eventlet_imports)
register(check_line_continuation_no_backslash)
register(revert_must_have_kwargs)

View File

@ -14,8 +14,6 @@
import abc
import six
from octavia.common import exceptions
@ -79,8 +77,7 @@ class QosPolicyNotFound(NetworkException):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractNetworkDriver(object):
class AbstractNetworkDriver(object, metaclass=abc.ABCMeta):
"""This class defines the methods for a fully functional network driver.
Implementations of this interface can expect a rollback to occur if any of

View File

@ -19,7 +19,6 @@ from neutronclient.common import exceptions as neutron_client_exceptions
from novaclient import exceptions as nova_client_exceptions
from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from octavia.common import constants
@ -143,8 +142,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
return None
def _get_ethertype_for_ip(self, ip):
address = ipaddress.ip_address(
ip if isinstance(ip, six.text_type) else six.u(ip))
address = ipaddress.ip_address(ip)
return 'IPv6' if address.version == 6 else 'IPv4'
def _update_security_group_rules(self, load_balancer, sec_grp_id):
@ -410,7 +408,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
self.update_vip_sg(load_balancer, vip)
plugged_amphorae = []
subnet = self.get_subnet(vip.subnet_id)
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
load_balancer.amphorae):
plugged_amphorae.append(self.plug_aap_port(load_balancer, vip,
@ -508,7 +506,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
"found").format(vip.subnet_id)
LOG.exception(msg)
raise base.PluggedVIPNotFound(msg)
for amphora in six.moves.filter(
for amphora in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
load_balancer.amphorae):
self.unplug_aap_port(vip, amphora, subnet)

View File

@ -16,7 +16,6 @@
import base64
import pkg_resources
import six
X509_CERT_CN = 'www.example.com'
@ -415,14 +414,12 @@ WHMk0DiS1quLYFZK2QhyFY2D1VLweyTQl8Hb/yYbxmd9QZDpDGCaIRkDt5H+rX17
-----END PKCS7-----
More spam here, too. Should be ignored."""
# Needed because we want PKCS7_DER to be raw bytes, not base64 encoded
if six.PY2:
def b64decode(thing):
return base64.decodestring(thing)
elif six.PY3:
def b64decode(thing):
return base64.decodebytes(bytes(thing, encoding='UTF-8'))
PKCS7_DER = b64decode(
'MIILZwYJKoZIhvcNAQcCoIILWDCCC1QCAQExADALBgkqhkiG9w0BBwGgggs6MIIF' +
'cjCCA1qgAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCVVMxEzAR' +

View File

@ -38,7 +38,6 @@ class OpenFixture(fixtures.Fixture):
return self.mock_open(name, *args, **kwargs)
return self._orig_open(name, *args, **kwargs)
self._patch = mock.patch('six.moves.builtins.open',
new=replacement_open)
self._patch = mock.patch('builtins.open', new=replacement_open)
self._patch.start()
self.addCleanup(self._patch.stop)

View File

@ -24,7 +24,6 @@ import fixtures
from oslo_config import fixture as oslo_fixture
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import six
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import certificate_update
@ -698,7 +697,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.get('/' + api_server.VERSION +
'/loadbalancer/123/haproxy')
self.assertEqual(200, rv.status_code)
self.assertEqual(six.b(CONTENT), rv.data)
self.assertEqual(octavia_utils.b(CONTENT), rv.data)
self.assertEqual('text/plain; charset=utf-8',
rv.headers['Content-Type'].lower())
@ -863,7 +862,8 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.get('/' + api_server.VERSION +
'/loadbalancer/123/certificates/test.pem')
self.assertEqual(200, rv.status_code)
self.assertEqual(dict(md5sum=hashlib.md5(six.b(CONTENT)).hexdigest()),
self.assertEqual(dict(md5sum=hashlib.md5(octavia_utils.
b(CONTENT)).hexdigest()),
jsonutils.loads(rv.data.decode('utf-8')))
def test_ubuntu_upload_certificate_md5(self):
@ -910,7 +910,7 @@ class TestServerTestCase(base.TestCase):
self.assertEqual(200, rv.status_code)
self.assertEqual(OK, jsonutils.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_called_once_with(six.b('TestTest'))
handle.write.assert_called_once_with(octavia_utils.b('TestTest'))
mock_exists.return_value = False
m = self.useFixture(test_utils.OpenFixture(path)).mock_open
@ -927,7 +927,7 @@ class TestServerTestCase(base.TestCase):
self.assertEqual(200, rv.status_code)
self.assertEqual(OK, jsonutils.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_called_once_with(six.b('TestTest'))
handle.write.assert_called_once_with(octavia_utils.b('TestTest'))
mock_makedir.assert_called_once_with('/var/lib/octavia/certs/123')
def test_ubuntu_upload_server_certificate(self):
@ -950,8 +950,8 @@ class TestServerTestCase(base.TestCase):
self.assertEqual(202, rv.status_code)
self.assertEqual(OK, jsonutils.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_any_call(six.b('TestT'))
handle.write.assert_any_call(six.b('est'))
handle.write.assert_any_call(octavia_utils.b('TestT'))
handle.write.assert_any_call(octavia_utils.b('est'))
def test_ubuntu_plug_network(self):
self._test_plug_network(consts.UBUNTU)
@ -2592,8 +2592,8 @@ class TestServerTestCase(base.TestCase):
self.assertEqual(202, rv.status_code)
self.assertEqual(OK, jsonutils.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_any_call(six.b('TestT'))
handle.write.assert_any_call(six.b('est'))
handle.write.assert_any_call(octavia_utils.b('TestT'))
handle.write.assert_any_call(octavia_utils.b('est'))
mock_mutate.assert_called_once_with()
# Test the exception handling

View File

@ -12,22 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import queue
from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
import six
from octavia.amphorae.backends.health_daemon import health_daemon
from octavia.common import constants
import octavia.tests.unit.base as base
if six.PY2:
import Queue as queue
else:
import queue
LISTENER_ID1 = uuidutils.generate_uuid()
LISTENER_ID2 = uuidutils.generate_uuid()

View File

@ -14,10 +14,9 @@
import socket
from unittest import mock
import six
from octavia.amphorae.backends.utils import haproxy_query as query
from octavia.common import constants
from octavia.common import utils as octavia_utils
import octavia.tests.unit.base as base
STATS_SOCKET_SAMPLE = (
@ -87,7 +86,7 @@ class QueryTestCase(base.TestCase):
self.q._query('test')
sock.connect.assert_called_once_with('')
sock.send.assert_called_once_with(six.b('test\n'))
sock.send.assert_called_once_with(octavia_utils.b('test\n'))
sock.recv.assert_called_with(1024)
self.assertTrue(sock.close.called)

View File

@ -20,12 +20,12 @@ from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
import requests
import requests_mock
import six
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy import rest_api_driver as driver
from octavia.common import constants
from octavia.common import utils as octavia_utils
from octavia.db import models
from octavia.network import data_models as network_models
from octavia.tests.common import sample_certs
@ -267,10 +267,10 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl_udp.id, timeout_dict=None)
def test_upload_cert_amp(self):
self.driver.upload_cert_amp(self.amp, six.b('test'))
self.driver.upload_cert_amp(self.amp, octavia_utils.b('test'))
self.driver.clients[
API_VERSION].update_cert_for_rotation.assert_called_once_with(
self.amp, six.b('test'))
self.amp, octavia_utils.b('test'))
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test__process_tls_certificates_no_ca_cert(self, mock_load_crt):
@ -660,10 +660,11 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.assertEqual(ref_haproxy_version, result)
def test_update_amphora_agent_config(self):
self.driver.update_amphora_agent_config(self.amp, six.b('test'))
self.driver.update_amphora_agent_config(
self.amp, octavia_utils.b('test'))
self.driver.clients[
API_VERSION].update_agent_config.assert_called_once_with(
self.amp, six.b('test'), timeout_dict=None)
self.amp, octavia_utils.b('test'), timeout_dict=None)
class TestAmphoraAPIClientTest(base.TestCase):

View File

@ -20,12 +20,12 @@ from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
import requests
import requests_mock
import six
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy import rest_api_driver as driver
from octavia.common import constants
from octavia.common import utils as octavia_utils
from octavia.db import models
from octavia.network import data_models as network_models
from octavia.tests.common import sample_certs
@ -268,10 +268,10 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl_udp.id, timeout_dict=None)
def test_upload_cert_amp(self):
self.driver.upload_cert_amp(self.amp, six.b('test'))
self.driver.upload_cert_amp(self.amp, octavia_utils.b('test'))
self.driver.clients[
API_VERSION].update_cert_for_rotation.assert_called_once_with(
self.amp, six.b('test'))
self.amp, octavia_utils.b('test'))
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test__process_tls_certificates_no_ca_cert(self, mock_load_crt):
@ -754,10 +754,11 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.assertEqual(ref_haproxy_version, result)
def test_update_amphora_agent_config(self):
self.driver.update_amphora_agent_config(self.amp, six.b('test'))
self.driver.update_amphora_agent_config(
self.amp, octavia_utils.b('test'))
self.driver.clients[
API_VERSION].update_agent_config.assert_called_once_with(
self.amp, six.b('test'), timeout_dict=None)
self.amp, octavia_utils.b('test'), timeout_dict=None)
class TestAmphoraAPIClientTest(base.TestCase):

View File

@ -15,9 +15,9 @@ from unittest import mock
from barbicanclient.v1 import containers
from barbicanclient.v1 import secrets
import six
import octavia.certificates.common.barbican as barbican_common
from octavia.common import utils as octavia_utils
import octavia.tests.common.sample_certs as sample
import octavia.tests.unit.base as base
@ -44,9 +44,9 @@ class TestBarbicanCert(base.TestCase):
def test_barbican_cert(self):
# Certificate data
self.certificate = six.binary_type(sample.X509_CERT)
self.certificate = bytes(sample.X509_CERT)
self.intermediates = sample.X509_IMDS_LIST
self.private_key = six.binary_type(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key = bytes(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key_passphrase = sample.X509_CERT_KEY_PASSPHRASE
self._prepare()
@ -68,15 +68,14 @@ class TestBarbicanCert(base.TestCase):
self.assertEqual(cert.get_private_key(),
sample.X509_CERT_KEY_ENCRYPTED)
self.assertEqual(cert.get_private_key_passphrase(),
six.b(sample.X509_CERT_KEY_PASSPHRASE))
octavia_utils.b(sample.X509_CERT_KEY_PASSPHRASE))
def test_barbican_cert_text(self):
# Certificate data
self.certificate = six.text_type(sample.X509_CERT)
self.intermediates = six.text_type(sample.X509_IMDS_LIST)
self.private_key = six.text_type(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key_passphrase = six.text_type(
sample.X509_CERT_KEY_PASSPHRASE)
self.certificate = str(sample.X509_CERT)
self.intermediates = str(sample.X509_IMDS_LIST)
self.private_key = str(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key_passphrase = str(sample.X509_CERT_KEY_PASSPHRASE)
self._prepare()
container = containers.CertificateContainer(
@ -93,9 +92,9 @@ class TestBarbicanCert(base.TestCase):
# Validate the cert functions
self.assertEqual(cert.get_certificate(),
six.b(six.text_type(sample.X509_CERT)))
octavia_utils.b(str(sample.X509_CERT)))
self.assertEqual(cert.get_intermediates(), sample.X509_IMDS_LIST)
self.assertEqual(cert.get_private_key(), six.b(six.text_type(
self.assertEqual(cert.get_private_key(), octavia_utils.b(str(
sample.X509_CERT_KEY_ENCRYPTED)))
self.assertEqual(cert.get_private_key_passphrase(),
six.b(sample.X509_CERT_KEY_PASSPHRASE))
octavia_utils.b(sample.X509_CERT_KEY_PASSPHRASE))

View File

@ -16,11 +16,11 @@ from unittest import mock
from barbicanclient.v1 import containers
from barbicanclient.v1 import secrets
from oslo_utils import uuidutils
import six
import octavia.certificates.common.barbican as barbican_common
import octavia.certificates.common.cert as cert
import octavia.certificates.manager.barbican_legacy as barbican_cert_mgr
from octavia.common import utils as octavia_utils
import octavia.tests.common.sample_certs as sample
import octavia.tests.unit.base as base
@ -209,7 +209,7 @@ class TestBarbicanManager(base.TestCase):
self.assertEqual(data.get_intermediates(),
sample.X509_IMDS_LIST)
self.assertEqual(data.get_private_key_passphrase(),
six.b(self.private_key_passphrase.payload))
octavia_utils.b(self.private_key_passphrase.payload))
def test_get_cert_no_registration(self):
self.bc.containers.get.return_value = self.container
@ -234,7 +234,7 @@ class TestBarbicanManager(base.TestCase):
self.assertEqual(data.get_intermediates(),
sample.X509_IMDS_LIST)
self.assertEqual(data.get_private_key_passphrase(),
six.b(self.private_key_passphrase.payload))
octavia_utils.b(self.private_key_passphrase.payload))
def test_get_cert_no_registration_raise_on_secret_access_failure(self):
self.bc.containers.get.return_value = self.container

View File

@ -18,7 +18,6 @@ from unittest import mock
from oslo_config import cfg
from oslo_utils import uuidutils
import six
import sqlalchemy
from octavia.common import constants
@ -259,21 +258,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
operating_status=constants.ONLINE)
@ -310,21 +307,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
operating_status=constants.ONLINE)
@ -347,8 +342,7 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
@ -391,8 +385,7 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners')):
for listener_id, listener in health.get('listeners').items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
@ -428,22 +421,20 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
# We should not double process a shared pool
self.hm.pool_repo.update.assert_called_once_with(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
operating_status=constants.ONLINE)
@ -472,21 +463,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(health.get('pools', {})):
for pool_id, pool in health.get('pools', {}).items():
# We should not double process a shared pool
self.hm.pool_repo.update.assert_called_once_with(
self.session_mock, 'pool-id-1',
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
operating_status=constants.ONLINE)
@ -517,8 +506,7 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
@ -548,21 +536,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -588,21 +574,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -628,14 +612,13 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
@ -665,21 +648,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.DEGRADED)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -708,14 +689,13 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
@ -749,14 +729,13 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
@ -789,21 +768,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -832,21 +809,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -877,21 +852,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.DEGRADED)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.DEGRADED)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -928,21 +901,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ERROR)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,
@ -1075,21 +1046,19 @@ class TestUpdateHealthDb(base.TestCase):
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
for listener_id, listener in health.get('listeners', {}).items():
self.listener_repo.update.assert_any_call(
self.session_mock, listener_id,
operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
for pool_id, pool in listener.get('pools', {}).items():
self.hm.pool_repo.update.assert_any_call(
self.session_mock, pool_id,
operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
for member_id, member in pool.get('members', {}).items():
self.member_repo.update.assert_any_call(
self.session_mock, member_id,

View File

@ -473,10 +473,10 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
pem_file_mock = fer.encrypt(
utils.get_six_compatible_value('test-pem-file'))
utils.get_compatible_value('test-pem-file'))
amphora_cert_upload_mock = amphora_driver_tasks.AmphoraCertUpload()
amphora_cert_upload_mock.execute(_amphora_mock, pem_file_mock)

View File

@ -29,11 +29,11 @@ class TestCertTasks(base.TestCase):
@mock.patch('stevedore.driver.DriverManager.driver')
def test_execute(self, mock_driver):
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
dummy_cert = local.LocalCert(
utils.get_six_compatible_value('test_cert'),
utils.get_six_compatible_value('test_key'))
utils.get_compatible_value('test_cert'),
utils.get_compatible_value('test_key'))
mock_driver.generate_cert_key_pair.side_effect = [dummy_cert]
c = cert_task.GenerateServerPEMTask()
pem = c.execute('123')

View File

@ -357,7 +357,7 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_cert(self, mock_driver, mock_conf, mock_jinja,
mock_log_cfg):
createcompute = compute_tasks.CertComputeCreate()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
mock_log_cfg.return_value = 'FAKE CFG'
@ -366,7 +366,7 @@ class TestComputeTasks(base.TestCase):
self.useFixture(test_utils.OpenFixture(path, 'test'))
# Test execute()
test_cert = fer.encrypt(
utils.get_six_compatible_value('test_cert')
utils.get_compatible_value('test_cert')
)
compute_id = createcompute.execute(_amphora_mock.id, test_cert,
server_group_id=SERVER_GRPOUP_ID

View File

@ -1071,10 +1071,10 @@ class TestDatabaseTasks(base.TestCase):
mock_get_cert_exp):
update_amp_cert = database_tasks.UpdateAmphoraDBCertExpiration()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
_pem_mock = fer.encrypt(
utils.get_six_compatible_value('test_cert')
utils.get_compatible_value('test_cert')
)
update_amp_cert.execute(_amphora_mock.id, _pem_mock)

View File

@ -504,11 +504,11 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_get,
mock_amphora_repo_update):
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
mock_amphora_repo_get.return_value = _db_amphora_mock
fer = fernet.Fernet(key)
pem_file_mock = fer.encrypt(
utils.get_six_compatible_value('test-pem-file')).decode('utf-8')
utils.get_compatible_value('test-pem-file')).decode('utf-8')
amphora_cert_upload_mock = amphora_driver_tasks.AmphoraCertUpload()
amphora_cert_upload_mock.execute(_amphora_mock, pem_file_mock)

View File

@ -29,11 +29,11 @@ class TestCertTasks(base.TestCase):
@mock.patch('stevedore.driver.DriverManager.driver')
def test_execute(self, mock_driver):
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
dummy_cert = local.LocalCert(
utils.get_six_compatible_value('test_cert'),
utils.get_six_compatible_value('test_key'))
utils.get_compatible_value('test_cert'),
utils.get_compatible_value('test_key'))
mock_driver.generate_cert_key_pair.side_effect = [dummy_cert]
c = cert_task.GenerateServerPEMTask()
pem = c.execute('123')

View File

@ -369,7 +369,7 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_cert(self, mock_driver, mock_conf, mock_jinja,
mock_log_cfg):
createcompute = compute_tasks.CertComputeCreate()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
mock_log_cfg.return_value = 'FAKE CFG'
@ -378,7 +378,7 @@ class TestComputeTasks(base.TestCase):
self.useFixture(test_utils.OpenFixture(path, 'test'))
# Test execute()
test_cert = fer.encrypt(
utils.get_six_compatible_value('test_cert')
utils.get_compatible_value('test_cert')
).decode('utf-8')
compute_id = createcompute.execute(_db_amphora_mock.id, test_cert,
server_group_id=SERVER_GRPOUP_ID

View File

@ -1167,10 +1167,10 @@ class TestDatabaseTasks(base.TestCase):
mock_get_cert_exp):
update_amp_cert = database_tasks.UpdateAmphoraDBCertExpiration()
key = utils.get_six_compatible_server_certs_key_passphrase()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
_pem_mock = fer.encrypt(
utils.get_six_compatible_value('test_cert')
utils.get_compatible_value('test_cert')
).decode('utf-8')
update_amp_cert.execute(_db_amphora_mock.id, _pem_mock)

View File

@ -167,13 +167,6 @@ class HackingTestCase(base.BaseTestCase):
self.assertEqual(0, len(list(checks.no_log_warn(
"LOG.warning()"))))
def test_no_xrange(self):
self.assertEqual(1, len(list(checks.no_xrange(
"xrange(45)"))))
self.assertEqual(0, len(list(checks.no_xrange(
"range(45)"))))
def test_no_log_translations(self):
for log in checks._all_log_levels:
for hint in checks._all_hints:
@ -205,23 +198,6 @@ class HackingTestCase(base.BaseTestCase):
self.assertLinePasses(f, "raise KeyError('Error text')",
'neutron_lib/tests/unit/mytest.py')
def test_check_no_basestring(self):
self.assertEqual(1, len(list(checks.check_no_basestring(
"isinstance('foo', basestring)"))))
self.assertEqual(0, len(list(checks.check_no_basestring(
"isinstance('foo', six.string_types)"))))
def test_dict_iteritems(self):
self.assertEqual(1, len(list(checks.check_python3_no_iteritems(
"obj.iteritems()"))))
self.assertEqual(0, len(list(checks.check_python3_no_iteritems(
"six.iteritems(obj)"))))
self.assertEqual(0, len(list(checks.check_python3_no_iteritems(
"obj.items()"))))
def test_check_no_eventlet_imports(self):
f = checks.check_no_eventlet_imports
self.assertLinePasses(f, 'from not_eventlet import greenthread')

View File

@ -14,11 +14,8 @@
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class VolumeBase(object):
class VolumeBase(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_volume_from_image(self, image_id):

View File

@ -14,7 +14,6 @@ keystoneauth1>=3.4.0 # Apache-2.0
keystonemiddleware>=4.17.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
WebOb>=1.7.1 # MIT
six>=1.10.0 # MIT
stevedore>=1.20.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0
oslo.context>=2.19.2 # Apache-2.0

View File

@ -26,7 +26,6 @@ from cryptography import x509
from pyasn1.codec.der import decoder as der_decoder
from pyasn1.codec.der import encoder as der_encoder
from pyasn1_modules import rfc2315
import six
PKCS7_BEG = """-----BEGIN PKCS7-----"""
@ -44,8 +43,7 @@ def _read_pem_blocks(data, *markers):
enumerate(map(lambda x: x[1], markers))))
idx = -1
state = stSpam
if six.PY3:
data = str(data, encoding="UTF-8")
data = data.decode('utf-8')
for certLine in data.replace('\r', '').split('\n'):
if not certLine:
break
@ -62,10 +60,6 @@ def _read_pem_blocks(data, *markers):
else:
certLines.append(certLine)
if state == stDump:
if six.PY2:
yield ''.join([
base64.b64decode(x) for x in certLines])
elif six.PY3:
yield ''.encode().join([
base64.b64decode(x) for x in certLines])
state = stSpam
@ -87,20 +81,17 @@ def _process_pkcs7_substrate(substrate):
for blob in content.getComponentByName('certificates'):
cert = x509.load_der_x509_certificate(der_encoder.encode(blob),
backends.default_backend())
six.print_(cert.public_bytes(
print(cert.public_bytes(
encoding=serialization.Encoding.PEM).decode(
'unicode_escape'), end='')
# Main program code
if len(sys.argv) != 1:
six.print_('Usage: cat <pkcs7_bundle.p7b> | %s' % sys.argv[0])
print('Usage: cat <pkcs7_bundle.p7b> | %s' % sys.argv[0])
sys.exit(-1)
# Need to read in binary bytes in case DER encoding of PKCS7 bundle
if six.PY2:
data = sys.stdin.read()
elif six.PY3:
data = sys.stdin.buffer.read()
# Look for PEM encoding