Enable more pep8 rules

... so that we can catch potential problems such as lack of module
imports more easily.

Change-Id: Ieeee9d763ac36ed95d90b6ab1b08d6b4c6f26d7c
This commit is contained in:
Takashi Kajinami 2024-09-28 21:10:26 +09:00
parent bd593b8df7
commit 8ebec483df
25 changed files with 200 additions and 213 deletions

View File

@ -22,7 +22,7 @@ import sys
import tempfile import tempfile
import packstack import packstack
from .utils import get_current_user from packstack.installer.utils import get_current_user
APP_NAME = "Packstack" APP_NAME = "Packstack"
@ -59,7 +59,7 @@ finally:
if uid != 0 and os.getuid() == 0: if uid != 0 and os.getuid() == 0:
try: try:
os.chown(PACKSTACK_VAR_DIR, uid, gid) os.chown(PACKSTACK_VAR_DIR, uid, gid)
except Exception as ex: except Exception:
print('Unable to change owner of %s. Please fix ownership ' print('Unable to change owner of %s. Please fix ownership '
'manually and try again.' % PACKSTACK_VAR_DIR) 'manually and try again.' % PACKSTACK_VAR_DIR)
sys.exit(1) sys.exit(1)

View File

@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import os import os
import shutil import shutil
import stat import stat
import uuid
import time
import logging
import tarfile import tarfile
import tempfile import tempfile
import time
import uuid
from .. import utils from .. import utils
@ -347,8 +347,8 @@ class PackstackDrone(SshTarballTransferMixin, Drone):
super(PackstackDrone, self).init_node() super(PackstackDrone, self).init_node()
server = utils.ScriptRunner(self.node) server = utils.ScriptRunner(self.node)
for pkg in ("puppet", "openssh-clients", "tar"): for pkg in ("puppet", "openssh-clients", "tar"):
server.append("rpm -q --whatprovides %(pkg)s || " server.append(f"rpm -q --whatprovides {pkg} || "
"yum install -y %(pkg)s" % locals()) f"yum install -y {pkg}")
server.execute() server.execute()
def add_resource(self, path, resource_type=None): def add_resource(self, path, resource_type=None):
@ -373,7 +373,7 @@ class PackstackDrone(SshTarballTransferMixin, Drone):
local.execute(log=False) local.execute(log=False)
# if we got to this point the puppet apply has finished # if we got to this point the puppet apply has finished
return True return True
except utils.ScriptRuntimeError as e: except utils.ScriptRuntimeError:
# the test raises an exception if the file doesn't exist yet # the test raises an exception if the file doesn't exist yet
return False return False
@ -402,9 +402,9 @@ class PackstackDrone(SshTarballTransferMixin, Drone):
rdir = self.resource_dir rdir = self.resource_dir
mdir = self._module_dir mdir = self._module_dir
server.append( server.append(
"( flock %(rdir)s/ps.lock " f"( flock {rdir}/ps.lock "
"puppet apply %(loglevel)s --modulepath %(mdir)s " f"puppet apply {loglevel} --modulepath {mdir} "
"%(recipe)s > %(running)s 2>&1 < /dev/null; " f"{recipe} > {running} 2>&1 < /dev/null; "
"mv %(running)s %(finished)s ) " f"mv {running} {finished} ) "
"> /dev/null 2>&1 < /dev/null &" % locals()) "> /dev/null 2>&1 < /dev/null &")
server.execute() server.execute()

View File

@ -16,8 +16,8 @@
Base class for steps & sequences Base class for steps & sequences
""" """
import sys
import logging import logging
import sys
import traceback import traceback
from .. import utils from .. import utils
@ -48,7 +48,7 @@ class Step(object):
# execute and report state # execute and report state
try: try:
self.function(config, messages) self.function(config, messages)
except Exception as ex: except Exception:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
state = utils.state_message(self.title, 'ERROR', 'red') state = utils.state_message(self.title, 'ERROR', 'red')
sys.stdout.write('%s\n' % state) sys.stdout.write('%s\n' % state)

View File

@ -16,14 +16,14 @@ import copy
import datetime import datetime
from functools import cmp_to_key from functools import cmp_to_key
import getpass import getpass
import io
import logging import logging
from io import StringIO
import os import os
import re import re
import sys import sys
import textwrap
import traceback import traceback
import types import types
import textwrap
from optparse import OptionGroup from optparse import OptionGroup
from optparse import OptionParser from optparse import OptionParser
@ -44,7 +44,7 @@ controller = Controller()
commandLineValues = {} commandLineValues = {}
# List to hold all values to be masked in logging (i.e. passwords and sensitive data) # List to hold all values to be masked in logging (i.e. passwords and sensitive data)
# TODO: read default values from conf_param? # TODO(tbd): read default values from conf_param?
masked_value_set = set() masked_value_set = set()
tmpfiles = [] tmpfiles = []
@ -94,12 +94,12 @@ def _getInputFromUser(param):
while loop: while loop:
# If the value was not supplied by the command line flags # If the value was not supplied by the command line flags
if param.CONF_NAME not in commandLineValues: if param.CONF_NAME not in commandLineValues:
message = StringIO() message = io.StringIO()
message.write(param.PROMPT) message.write(param.PROMPT)
val_list = param.VALIDATORS or [] val_list = param.VALIDATORS or []
if (validators.validate_regexp not in val_list if (validators.validate_regexp not in val_list and
and param.OPTION_LIST): param.OPTION_LIST):
message.write(" [%s]" % "|".join(param.OPTION_LIST)) message.write(" [%s]" % "|".join(param.OPTION_LIST))
if param.DEFAULT_VALUE: if param.DEFAULT_VALUE:
@ -182,7 +182,7 @@ def input_param(param):
def _askYesNo(question=None): def _askYesNo(question=None):
message = StringIO() message = io.StringIO()
while True: while True:
askString = "\r%s? (yes|no): " % (question) askString = "\r%s? (yes|no): " % (question)
@ -365,18 +365,17 @@ def _loadParamFromFile(config, section, param_name):
if value is None: if value is None:
value = val value = val
if value != val: if value != val:
raise ValueError('Parameter %(param_name)s deprecates ' raise ValueError(f'Parameter {param_name} deprecates '
'following parameters:\n%(deprecated)s.\n' f'following parameters:\n{deprecated}.\n'
'Please either use parameter %(param_name)s ' f'Please either use parameter {param_name} '
'or use same value for all deprecated ' 'or use same value for all deprecated '
'parameters.' % locals()) 'parameters.')
if deprecated and value is not None: if deprecated and value is not None:
controller.MESSAGES.append('Deprecated parameter has been used ' controller.MESSAGES.append('Deprecated parameter has been used '
'in answer file. Please use parameter ' 'in answer file. Please use parameter '
'%(param_name)s next time. This ' f'{param_name} next time. This '
'parameter deprecates following ' 'parameter deprecates following '
'parameters: %(deprecated)s.' f'parameters: {deprecated}.')
% locals())
if value is None: if value is None:
# Let's use default value if we have one # Let's use default value if we have one
value = getattr(param, 'DEFAULT_VALUE', None) value = getattr(param, 'DEFAULT_VALUE', None)
@ -473,6 +472,7 @@ def _gettmpanswerfilepath():
path = os.path.abspath(os.path.join(p, "tmp-packstack-answers-%s.txt" % ts)) path = os.path.abspath(os.path.join(p, "tmp-packstack-answers-%s.txt" % ts))
tmpfiles.append(path) tmpfiles.append(path)
controller.MESSAGES.append(msg)
return path return path
@ -863,7 +863,6 @@ def initCmdLineParser():
for param in group.parameters.itervalues(): for param in group.parameters.itervalues():
cmdOption = param.CMD_OPTION cmdOption = param.CMD_OPTION
paramUsage = param.USAGE paramUsage = param.USAGE
optionsList = param.OPTION_LIST
useDefault = param.USE_DEFAULT useDefault = param.USE_DEFAULT
if not useDefault: if not useDefault:
@ -888,7 +887,6 @@ def printOptions():
for param in group.parameters.itervalues(): for param in group.parameters.itervalues():
cmdOption = param.CONF_NAME cmdOption = param.CONF_NAME
paramUsage = param.USAGE paramUsage = param.USAGE
optionsList = param.OPTION_LIST or ""
print("%s" % (("**%s**" % str(cmdOption)).ljust(30))) print("%s" % (("**%s**" % str(cmdOption)).ljust(30)))
print(" %s" % paramUsage + "\n") print(" %s" % paramUsage + "\n")
@ -1003,8 +1001,7 @@ def main():
logFile = initLogging(options.debug) logFile = initLogging(options.debug)
# Parse parameters # Parse parameters
runConfiguration = True conf_file = None
confFile = None
controller.CONF['DEFAULT_EXEC_TIMEOUT'] = options.timeout controller.CONF['DEFAULT_EXEC_TIMEOUT'] = options.timeout
controller.CONF['DRY_RUN'] = options.dry_run controller.CONF['DRY_RUN'] = options.dry_run
@ -1047,12 +1044,13 @@ def main():
msg = ('Please do not set --default-password ' msg = ('Please do not set --default-password '
'when specifying an answer file.') 'when specifying an answer file.')
raise FlagValidationError(msg) raise FlagValidationError(msg)
confFile = os.path.expanduser(options.answer_file) conf_file = os.path.expanduser(options.answer_file)
if not os.path.exists(confFile): if not os.path.exists(conf_file):
raise Exception(output_messages.ERR_NO_ANSWER_FILE % confFile) raise Exception(output_messages.ERR_NO_ANSWER_FILE %
conf_file)
else: else:
_set_command_line_values(options) _set_command_line_values(options)
_main(options, confFile, logFile) _main(options, conf_file, logFile)
except FlagValidationError as ex: except FlagValidationError as ex:
optParser.error(str(ex)) optParser.error(str(ex))

View File

@ -14,19 +14,19 @@
from .datastructures import SortedDict from .datastructures import SortedDict
from .decorators import retry from .decorators import retry
from .network import device_from_ip
from .network import force_ip
from .network import get_localhost_ip from .network import get_localhost_ip
from .network import host2ip from .network import host2ip
from .network import force_ip
from .network import device_from_ip
from .shell import execute from .shell import execute
from .shell import ScriptRunner from .shell import ScriptRunner
from .shortcuts import host_iter
from .shortcuts import hosts
from .shortcuts import get_current_user from .shortcuts import get_current_user
from .shortcuts import get_current_username from .shortcuts import get_current_username
from .shortcuts import host_iter
from .shortcuts import hosts
from .shortcuts import split_hosts from .shortcuts import split_hosts
from .strings import COLORS
from .strings import color_text from .strings import color_text
from .strings import COLORS
from .strings import mask_string from .strings import mask_string
from .strings import state_format from .strings import state_format
from .strings import state_message from .strings import state_message
@ -34,8 +34,8 @@ from .strings import state_message
__all__ = ('SortedDict', __all__ = ('SortedDict',
'retry', 'retry',
'get_localhost_ip', 'host2ip', 'force_ip', 'device_from_ip', 'device_from_ip', 'force_ip', 'get_localhost_ip', 'host2ip',
'ScriptRunner', 'execute', 'ScriptRunner', 'execute',
'host_iter', 'hosts', 'get_current_user', 'get_current_username', 'get_current_user', 'get_current_username', 'host_iter', 'hosts',
'split_hosts', 'COLORS', 'color_text', 'mask_string', 'split_hosts', 'color_text', 'COLORS', 'mask_string',
'state_format', 'state_message') 'state_format', 'state_message')

View File

@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import netifaces import netifaces
import socket import socket
import logging
from ..exceptions import NetworkError from ..exceptions import NetworkError
from .shell import ScriptRunner from .shell import ScriptRunner

View File

@ -16,10 +16,10 @@
Contains all core validation functions. Contains all core validation functions.
""" """
import logging
import os import os
import re import re
import socket import socket
import logging
from . import utils from . import utils

View File

@ -19,7 +19,7 @@ import re
from packstack.installer.exceptions import PuppetError from packstack.installer.exceptions import PuppetError
# TODO: Fill logger name when logging system will be refactored # TODO(tbd): Fill logger name when logging system will be refactored
logger = logging.getLogger() logger = logging.getLogger()
re_color = re.compile(r'\x1b.*?\d\dm') re_color = re.compile(r'\x1b.*?\d\dm')

View File

@ -138,7 +138,6 @@ def initSequences(controller):
# ------------------------ step functions ------------------------- # ------------------------ step functions -------------------------
def create_manifest(config, messages): def create_manifest(config, messages):
server = utils.ScriptRunner(config['CONFIG_AMQP_HOST'])
if config['CONFIG_AMQP_ENABLE_SSL'] == 'y': if config['CONFIG_AMQP_ENABLE_SSL'] == 'y':
config['CONFIG_AMQP_SSL_ENABLED'] = True config['CONFIG_AMQP_SSL_ENABLED'] = True
config['CONFIG_AMQP_PROTOCOL'] = 'ssl' config['CONFIG_AMQP_PROTOCOL'] = 'ssl'
@ -147,9 +146,7 @@ def create_manifest(config, messages):
service = 'AMQP' service = 'AMQP'
ssl_key_file = '/etc/pki/tls/private/ssl_amqp.key' ssl_key_file = '/etc/pki/tls/private/ssl_amqp.key'
ssl_cert_file = '/etc/pki/tls/certs/ssl_amqp.crt' ssl_cert_file = '/etc/pki/tls/certs/ssl_amqp.crt'
cacert = config['CONFIG_AMQP_SSL_CACERT_FILE'] = ( config['CONFIG_AMQP_SSL_CACERT_FILE'] = config['CONFIG_SSL_CACERT']
config['CONFIG_SSL_CACERT']
)
generate_ssl_cert(config, amqp_host, service, ssl_key_file, generate_ssl_cert(config, amqp_host, service, ssl_key_file,
ssl_cert_file) ssl_cert_file)
else: else:

View File

@ -649,15 +649,15 @@ def check_netapp_7modeiscsi_options(config):
def check_netapp_7mode_fc_options(config): def check_netapp_7mode_fc_options(config):
return (check_netapp_options(config) and return (check_netapp_options(config) and
config['CONFIG_CINDER_NETAPP_STORAGE_FAMILY'] == "ontap_7mode" config['CONFIG_CINDER_NETAPP_STORAGE_FAMILY'] == "ontap_7mode" and
and config['CONFIG_CINDER_NETAPP_STORAGE_PROTOCOL'] == "fc") config['CONFIG_CINDER_NETAPP_STORAGE_PROTOCOL'] == "fc")
def check_netapp_vserver_options(config): def check_netapp_vserver_options(config):
return (check_netapp_options(config) and return (
config['CONFIG_CINDER_NETAPP_STORAGE_FAMILY'] == "ontap_cluster" check_netapp_options(config) and
and config['CONFIG_CINDER_NETAPP_STORAGE_PROTOCOL'] in config['CONFIG_CINDER_NETAPP_STORAGE_FAMILY'] == "ontap_cluster" and
['nfs', 'iscsi']) config['CONFIG_CINDER_NETAPP_STORAGE_PROTOCOL'] in ['nfs', 'iscsi'])
def check_netapp_eseries_options(config): def check_netapp_eseries_options(config):

View File

@ -142,7 +142,6 @@ def create_manifest(config, messages):
proto = "http" proto = "http"
config["CONFIG_HORIZON_PORT"] = 80 config["CONFIG_HORIZON_PORT"] = 80
sslmanifestdata = ''
if config["CONFIG_HORIZON_SSL"] == 'y': if config["CONFIG_HORIZON_SSL"] == 'y':
config["CONFIG_HORIZON_PORT"] = 443 config["CONFIG_HORIZON_PORT"] = 443
proto = "https" proto = "https"

View File

@ -522,21 +522,23 @@ def initSequences(controller):
'Please set CONFIG_NEUTRON_INSTALL=y') 'Please set CONFIG_NEUTRON_INSTALL=y')
return return
has_ovs = 'openvswitch' in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']
has_ovn = 'ovn' in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']
has_lb = 'linuxbridge' in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']
if config['CONFIG_IRONIC_INSTALL'] == 'y': if config['CONFIG_IRONIC_INSTALL'] == 'y':
config['CONFIG_NEUTRON_ML2_TYPE_DRIVERS'] += ', flat' config['CONFIG_NEUTRON_ML2_TYPE_DRIVERS'] += ', flat'
config['CONFIG_NEUTRON_ML2_TENANT_NETWORK_TYPES'] += ', flat' config['CONFIG_NEUTRON_ML2_TENANT_NETWORK_TYPES'] += ', flat'
if 'openvswitch' not in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']: if not has_ovs:
config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] += ', openvswitch' config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] += ', openvswitch'
config['CONFIG_NEUTRON_ML2_FLAT_NETWORKS'] = 'extnet' config['CONFIG_NEUTRON_ML2_FLAT_NETWORKS'] = 'extnet'
if use_ml2_with_sriovnicswitch(config): if use_ml2_with_sriovnicswitch(config):
if ('openvswitch' not in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] if (not has_ovs) and (not has_lb):
and 'linuxbridge' not in
config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']):
config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] += ', openvswitch' config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] += ', openvswitch'
if use_ml2_with_ovn(config): if use_ml2_with_ovn(config):
if ('ovn' not in config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']): if not has_ovn:
config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] = 'ovn' config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] = 'ovn'
# OVN only supports geneve encapsulation # OVN only supports geneve encapsulation
if ('geneve' not in config['CONFIG_NEUTRON_ML2_TYPE_DRIVERS']): if ('geneve' not in config['CONFIG_NEUTRON_ML2_TYPE_DRIVERS']):
@ -567,8 +569,7 @@ def initSequences(controller):
ovn_external = 'CONFIG_NEUTRON_OVN_EXTERNAL_PHYSNET' ovn_external = 'CONFIG_NEUTRON_OVN_EXTERNAL_PHYSNET'
config[ovs_external] = config[ovn_external] config[ovs_external] = config[ovn_external]
elif use_ml2_with_ovs(config): elif use_ml2_with_ovs(config):
if ('openvswitch' not in config[ if not has_ovs:
'CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS']):
config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] = 'openvswitch' config['CONFIG_NEUTRON_ML2_MECHANISM_DRIVERS'] = 'openvswitch'
plugin_db = 'neutron' plugin_db = 'neutron'
@ -740,8 +741,6 @@ def create_manifests(config, messages):
config['FIREWALL_DRIVER'] = ("neutron.agent.linux.iptables_firewall." config['FIREWALL_DRIVER'] = ("neutron.agent.linux.iptables_firewall."
"OVSHybridIptablesFirewallDriver") "OVSHybridIptablesFirewallDriver")
plugin_manifest = 'neutron_ml2_plugin'
if config['CONFIG_AMQP_ENABLE_SSL'] == 'y': if config['CONFIG_AMQP_ENABLE_SSL'] == 'y':
ssl_cert_file = config['CONFIG_NEUTRON_SSL_CERT'] = ( ssl_cert_file = config['CONFIG_NEUTRON_SSL_CERT'] = (
'/etc/pki/tls/certs/ssl_amqp_neutron.crt' '/etc/pki/tls/certs/ssl_amqp_neutron.crt'
@ -939,9 +938,7 @@ def create_l2_agent_manifests(config, messages):
config["CONFIG_NEUTRON_OVS_BRIDGE_IFACES_COMPUTE"] = [] config["CONFIG_NEUTRON_OVS_BRIDGE_IFACES_COMPUTE"] = []
no_local_types = set(ovs_type) & set(['gre', 'vxlan', 'vlan', 'flat']) no_local_types = set(ovs_type) & set(['gre', 'vxlan', 'vlan', 'flat'])
no_tunnel_types = set(ovs_type) & set(['vlan', 'flat']) no_tunnel_types = set(ovs_type) & set(['vlan', 'flat'])
elif agent == "linuxbridge": elif agent != "linuxbridge":
host_var = 'CONFIG_NEUTRON_LB_HOST'
else:
raise KeyError("Unknown layer2 agent") raise KeyError("Unknown layer2 agent")
for host in network_hosts | compute_hosts: for host in network_hosts | compute_hosts:
@ -950,8 +947,7 @@ def create_l2_agent_manifests(config, messages):
# only required if vlan or flat are enabled. # only required if vlan or flat are enabled.
if ( if (
agent in ["openvswitch", "ovn"] and ( agent in ["openvswitch", "ovn"] and (
(host in network_hosts and no_local_types) (host in network_hosts and no_local_types) or no_tunnel_types)
or no_tunnel_types)
): ):
if config['CONFIG_USE_SUBNETS'] == 'y': if config['CONFIG_USE_SUBNETS'] == 'y':
iface_arr = [ iface_arr = [

View File

@ -16,7 +16,6 @@
Installs and configures Nova Installs and configures Nova
""" """
import distro
import os import os
import socket import socket
@ -38,13 +37,6 @@ PLUGIN_NAME_COLORED = utils.color_text(PLUGIN_NAME, 'blue')
def initConfig(controller): def initConfig(controller):
if distro.linux_distribution()[0] == "Fedora":
primary_netif = "em1"
secondary_netif = "em2"
else:
primary_netif = "eth0"
secondary_netif = "eth1"
nova_params = { nova_params = {
"NOVA": [ "NOVA": [
{"CMD_OPTION": 'nova-db-purge-enable', {"CMD_OPTION": 'nova-db-purge-enable',
@ -392,7 +384,7 @@ def create_compute_manifest(config, messages):
) )
ssl_host = config['CONFIG_CONTROLLER_HOST'] ssl_host = config['CONFIG_CONTROLLER_HOST']
service = 'ceilometer' service = 'ceilometer'
generate_ssl_cert(config, host, service, ssl_key_file, generate_ssl_cert(config, ssl_host, service, ssl_key_file,
ssl_cert_file) ssl_cert_file)
fw_details = dict() fw_details = dict()
@ -444,7 +436,6 @@ def create_vncproxy_manifest(config, messages):
def create_common_manifest(config, messages): def create_common_manifest(config, messages):
global compute_hosts, network_hosts global compute_hosts, network_hosts
network_multi = len(network_hosts) > 1
dbacces_hosts = set([config.get('CONFIG_CONTROLLER_HOST')]) dbacces_hosts = set([config.get('CONFIG_CONTROLLER_HOST')])
dbacces_hosts |= network_hosts dbacces_hosts |= network_hosts

View File

@ -17,11 +17,10 @@ Plugin responsible for setting OpenStack global options
""" """
import distro import distro
import glob
import logging
import os import os
import re import re
import logging
import glob
import os
import uuid import uuid
from packstack.installer import basedefs from packstack.installer import basedefs
@ -936,15 +935,15 @@ def run_rhn_reg(host, server_url, username=None, password=None,
Registers given host to given RHN Satellite server. To successfully Registers given host to given RHN Satellite server. To successfully
register either activation_key or username/password is required. register either activation_key or username/password is required.
""" """
logging.debug('Setting RHN Satellite server: %s.' % locals()) logging.debug('Setting RHN Satellite server: %s.' % server_url)
mask = [] mask = []
cmd = ['/usr/sbin/rhnreg_ks'] cmd = ['/usr/sbin/rhnreg_ks']
server = utils.ScriptRunner(host) server = utils.ScriptRunner(host)
# check satellite server url # check satellite server url
server_url = (server_url.rstrip('/').endswith('/XMLRPC') server_url = (server_url.rstrip('/').endswith('/XMLRPC') and
and server_url or '%s/XMLRPC' % server_url) server_url or '%s/XMLRPC' % server_url)
cmd.extend(['--serverUrl', server_url]) cmd.extend(['--serverUrl', server_url])
if activation_key: if activation_key:
@ -964,10 +963,9 @@ def run_rhn_reg(host, server_url, username=None, password=None,
location = "/etc/sysconfig/rhn/%s" % os.path.basename(cacert) location = "/etc/sysconfig/rhn/%s" % os.path.basename(cacert)
if not os.path.isfile(location): if not os.path.isfile(location):
logging.debug('Downloading cacert from %s.' % server_url) logging.debug('Downloading cacert from %s.' % server_url)
wget_cmd = ('ls %(location)s &> /dev/null && echo -n "" || ' wget_cmd = (f'ls {location} &> /dev/null && echo -n "" || '
'wget -nd --no-check-certificate --timeout=30 ' 'wget -nd --no-check-certificate --timeout=30 '
'--tries=3 -O "%(location)s" "%(cacert)s"' % f'--tries=3 -O "{location}" "{cacert}"')
locals())
server.append(wget_cmd) server.append(wget_cmd)
cmd.extend(['--sslCACert', location]) cmd.extend(['--sslCACert', location])
@ -1002,13 +1000,13 @@ def run_rhsm_reg(host, username, password, optional=False, proxy_server=None,
# configure proxy if it is necessary # configure proxy if it is necessary
if proxy_server: if proxy_server:
cmd = ('subscription-manager config ' cmd = ['subscription-manager', 'config',
'--server.proxy_hostname=%(proxy_server)s ' f'--server.proxy_hostname={proxy_server}',
'--server.proxy_port=%(proxy_port)s') f'--server.proxy_port={proxy_port}']
if proxy_user: if proxy_user:
cmd += (' --server.proxy_user=%(proxy_user)s ' cmd += [f'--server.proxy_user={proxy_user}',
'--server.proxy_password=%(proxy_password)s') f'--server.proxy_password={proxy_password}']
server.append(cmd % locals()) server.append(cmd.join(' '))
if sat6_server: if sat6_server:
# Register to Satellite 6 host # Register to Satellite 6 host
@ -1064,14 +1062,12 @@ def manage_centos_release_openstack(host, config):
if not match: if not match:
# No CentOS Cloud SIG repo, so we don't need to continue # No CentOS Cloud SIG repo, so we don't need to continue
return return
branch, version, release = match.group('branch'), match.group('version'), match.group('release') branch = match.group('branch')
package_name = ("centos-release-openstack-%(branch)s-%(version)s-"
"%(release)s" % locals())
server = utils.ScriptRunner(host) server = utils.ScriptRunner(host)
server.append("(rpm -q 'centos-release-openstack-%(branch)s' ||" server.append(
" yum -y install centos-release-openstack-%(branch)s) || true" f"(rpm -q 'centos-release-openstack-{branch}' ||"
% locals()) f" yum -y install centos-release-openstack-{branch}) || true")
try: try:
server.execute() server.execute()
@ -1100,13 +1096,12 @@ def manage_rdo(host, config):
dist_tag = '.el9s' dist_tag = '.el9s'
else: else:
dist_tag = '' dist_tag = ''
rdo_url = ("https://www.rdoproject.org/repos/openstack-%(version)s/" rdo_url = (f"https://www.rdoproject.org/repos/openstack-{version}/"
"rdo-release-%(version)s%(dist_tag)s.rpm" % locals()) f"rdo-release-{version}{dist_tag}.rpm")
server = utils.ScriptRunner(host) server = utils.ScriptRunner(host)
server.append("(rpm -q 'rdo-release-%(version)s' ||" server.append(f"(rpm -q 'rdo-release-{version}' ||"
" yum install -y --nogpg %(rdo_url)s) || true" f" yum install -y --nogpg {rdo_url}) || true")
% locals())
try: try:
server.execute() server.execute()
except exceptions.ScriptRuntimeError as ex: except exceptions.ScriptRuntimeError as ex:
@ -1115,11 +1110,11 @@ def manage_rdo(host, config):
reponame = 'openstack-%s' % version reponame = 'openstack-%s' % version
server.clear() server.clear()
server.append('yum-config-manager --enable %(reponame)s' % locals()) server.append('yum-config-manager --enable %s' % reponame)
if config['CONFIG_ENABLE_RDO_TESTING'] == 'y': if config['CONFIG_ENABLE_RDO_TESTING'] == 'y':
server.append('yum-config-manager --disable %(reponame)s' % locals()) server.append('yum-config-manager --disable %s' % reponame)
server.append('yum-config-manager --enable %(reponame)s-testing' % locals()) server.append('yum-config-manager --enable %s-testing' % reponame)
rc, out = server.execute() rc, out = server.execute()
match = re.search(r'enabled\s*=\s*(1|True)', out) match = re.search(r'enabled\s*=\s*(1|True)', out)
@ -1167,7 +1162,7 @@ def choose_ip_version(config, messages):
def install_keys_on_host(hostname, sshkeydata): def install_keys_on_host(hostname, sshkeydata):
server = utils.ScriptRunner(hostname) server = utils.ScriptRunner(hostname)
# TODO replace all that with ssh-copy-id # TODO(tbd) replace all that with ssh-copy-id
server.append("mkdir -p ~/.ssh") server.append("mkdir -p ~/.ssh")
server.append("chmod 500 ~/.ssh") server.append("chmod 500 ~/.ssh")
server.append("grep '%s' ~/.ssh/authorized_keys > /dev/null 2>&1 || " server.append("grep '%s' ~/.ssh/authorized_keys > /dev/null 2>&1 || "
@ -1324,11 +1319,10 @@ def server_prep(config, messages):
if CONFIG_REPO: if CONFIG_REPO:
for i, repourl in enumerate(CONFIG_REPO.split(',')): for i, repourl in enumerate(CONFIG_REPO.split(',')):
reponame = 'packstack_%d' % i reponame = 'packstack_%d' % i
server.append('echo "[%(reponame)s]\nname=%(reponame)s\n' server.append(f'echo "[{reponame}]\nname={reponame}\n'
'baseurl=%(repourl)s\nenabled=1\n' f'baseurl={repourl}\nenabled=1\n'
'priority=1\ngpgcheck=0"' 'priority=1\ngpgcheck=0"'
' > /etc/yum.repos.d/%(reponame)s.repo' f' > /etc/yum.repos.d/{reponame}.repo')
% locals())
server.append("yum clean metadata") server.append("yum clean metadata")
server.execute() server.execute()

View File

@ -16,9 +16,9 @@
Installs and configures Puppet Installs and configures Puppet
""" """
import sys
import logging import logging
import os import os
import sys
import time import time
from packstack.installer import utils from packstack.installer import utils

View File

@ -16,9 +16,9 @@
Installs and configures Swift Installs and configures Swift
""" """
import netaddr
import re import re
import uuid import uuid
import netaddr
from packstack.installer import basedefs from packstack.installer import basedefs
from packstack.installer import validators from packstack.installer import validators
@ -262,7 +262,6 @@ def create_builder_manifest(config, messages):
('SWIFT_RING_ACCOUNT_DEVICES', 'ring_account_device', 6002)]): ('SWIFT_RING_ACCOUNT_DEVICES', 'ring_account_device', 6002)]):
swift_dev_details = dict() swift_dev_details = dict()
host = utils.force_ip(config['CONFIG_STORAGE_HOST_URL']) host = utils.force_ip(config['CONFIG_STORAGE_HOST_URL'])
fstype = config["CONFIG_SWIFT_STORAGE_FSTYPE"]
for device in devices: for device in devices:
devicename = device['device_name'] devicename = device['device_name']
key = "dev_%s_%s" % (host, devicename) key = "dev_%s_%s" % (host, devicename)

View File

@ -22,21 +22,13 @@ import tempfile
from unittest import TestCase from unittest import TestCase
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
from packstack.installer.core.drones import * from packstack.installer.core import drones
class SshTarballTransferMixinTestCase(PackstackTestCaseMixin, TestCase): class SshTarballTransferMixinTestCase(PackstackTestCaseMixin, TestCase):
def setUp(self): def setUp(self):
# Creating a temp directory that can be used by tests # Creating a temp directory that can be used by tests
self.tempdir = tempfile.mkdtemp() self.tempdir = tempfile.mkdtemp()
def tearDown(self):
# remove the temp directory
# shutil.rmtree(self.tempdir)
pass
def setUp(self):
self.tempdir = tempfile.mkdtemp()
# prepare resource files # prepare resource files
res1path = os.path.join(self.tempdir, 'res1.txt') res1path = os.path.join(self.tempdir, 'res1.txt')
with open(res1path, 'w') as f: with open(res1path, 'w') as f:
@ -56,7 +48,7 @@ class SshTarballTransferMixinTestCase(PackstackTestCaseMixin, TestCase):
with open(rec2path, 'w') as f: with open(rec2path, 'w') as f:
f.write('recipe two') f.write('recipe two')
# prepare class # prepare class
self.mixin = SshTarballTransferMixin() self.mixin = drones.SshTarballTransferMixin()
self.mixin.node = '127.0.0.1' self.mixin.node = '127.0.0.1'
self.mixin.resource_dir = os.path.join(self.tempdir, 'remote') self.mixin.resource_dir = os.path.join(self.tempdir, 'remote')
self.mixin.recipe_dir = os.path.join(self.tempdir, 'remote', self.mixin.recipe_dir = os.path.join(self.tempdir, 'remote',
@ -71,6 +63,11 @@ class SshTarballTransferMixinTestCase(PackstackTestCaseMixin, TestCase):
self.mixin.local_tmpdir, self.mixin.remote_tmpdir): self.mixin.local_tmpdir, self.mixin.remote_tmpdir):
os.mkdir(i) os.mkdir(i)
def tearDown(self):
# remove the temp directory
# shutil.rmtree(self.tempdir)
pass
def test_tarball_packing(self): def test_tarball_packing(self):
""" """
Tests packing of resources and recipes Tests packing of resources and recipes
@ -126,7 +123,7 @@ class SshTarballTransferMixinTestCase(PackstackTestCaseMixin, TestCase):
''' '''
class FakeDroneObserver(DroneObserver): class FakeDroneObserver(drones.DroneObserver):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(FakeDroneObserver, self).__init__(*args, **kwargs) super(FakeDroneObserver, self).__init__(*args, **kwargs)
self.log = [] self.log = []
@ -153,7 +150,7 @@ class FakeDroneObserver(DroneObserver):
self.log.append('finished:%s' % recipe) self.log.append('finished:%s' % recipe)
class FakeDrone(Drone): class FakeDrone(drones.Drone):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(FakeDrone, self).__init__(*args, **kwargs) super(FakeDrone, self).__init__(*args, **kwargs)
self.log = [] self.log = []

View File

@ -17,7 +17,8 @@
import os import os
from unittest import TestCase from unittest import TestCase
from packstack.installer.processors import *
from packstack.installer import processors
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
@ -25,12 +26,13 @@ from ..test_base import PackstackTestCaseMixin
class ProcessorsTestCase(PackstackTestCaseMixin, TestCase): class ProcessorsTestCase(PackstackTestCaseMixin, TestCase):
def test_process_host(self): def test_process_host(self):
"""Test packstack.installer.processors.process_host.""" """Test packstack.installer.processors.process_host."""
proc_local = process_host('localhost', 'HOSTNAME') proc_local = processors.process_host('localhost', 'HOSTNAME')
self.assertIn(proc_local, ['127.0.0.1', '::1']) self.assertIn(proc_local, ['127.0.0.1', '::1'])
def test_process_ssh_key(self): def test_process_ssh_key(self):
"""Test packstack.installer.processors.process_ssh_key.""" """Test packstack.installer.processors.process_ssh_key."""
path = process_ssh_key(os.path.join(self.tempdir, 'id_rsa'), 'SSH_KEY') path = processors.process_ssh_key(
os.path.join(self.tempdir, 'id_rsa'), 'SSH_KEY')
# test if key was created # test if key was created
self.assertTrue(bool(path)) self.assertTrue(bool(path))
# test if key exists # test if key exists

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import distro
import os import os
import shutil import shutil
import subprocess import subprocess
@ -26,7 +27,6 @@ from packstack.modules import puppet
from packstack.installer import basedefs from packstack.installer import basedefs
from packstack.installer import run_setup from packstack.installer import run_setup
from packstack.installer import validators from packstack.installer import validators
from packstack.plugins.nova_300 import distro
from ..test_base import FakePopen from ..test_base import FakePopen
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin

View File

@ -20,7 +20,7 @@ import sys
from unittest import TestCase from unittest import TestCase
from packstack.installer import utils from packstack.installer import utils
from packstack.installer.core.sequences import * from packstack.installer.core import sequences
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
@ -43,7 +43,7 @@ class StepTestCase(PackstackTestCaseMixin, TestCase):
if 'test' not in config: if 'test' not in config:
raise AssertionError('Missing config value.') raise AssertionError('Missing config value.')
step = Step('test', func, title='Running test') step = sequences.Step('test', func, title='Running test')
step.run(config={'test': 'test'}) step.run(config={'test': 'test'})
contents = sys.stdout.getvalue() contents = sys.stdout.getvalue()
@ -66,8 +66,8 @@ class SequenceTestCase(PackstackTestCaseMixin, TestCase):
{'name': '3', 'function': lambda x, y: True, {'name': '3', 'function': lambda x, y: True,
'title': 'Step 3'}] 'title': 'Step 3'}]
self.seq = Sequence('test', self.steps, condition='test', self.seq = sequences.Sequence('test', self.steps, condition='test',
cond_match='test') cond_match='test')
def tearDown(self): def tearDown(self):
super(SequenceTestCase, self).tearDown() super(SequenceTestCase, self).tearDown()

View File

@ -22,7 +22,7 @@ Test cases for packstack.installer.core.parameters module.
from unittest import TestCase from unittest import TestCase
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
from packstack.installer.core.parameters import * from packstack.installer.core import parameters
class ParameterTestCase(PackstackTestCaseMixin, TestCase): class ParameterTestCase(PackstackTestCaseMixin, TestCase):
@ -48,7 +48,7 @@ class ParameterTestCase(PackstackTestCaseMixin, TestCase):
Test packstack.installer.core.parameters.Parameter Test packstack.installer.core.parameters.Parameter
initialization initialization
""" """
param = Parameter(self.data) param = parameters.Parameter(self.data)
for key, value in self.data.items(): for key, value in self.data.items():
self.assertEqual(getattr(param, key), value) self.assertEqual(getattr(param, key), value)
@ -56,7 +56,7 @@ class ParameterTestCase(PackstackTestCaseMixin, TestCase):
""" """
Test packstack.installer.core.parameters.Parameter default value Test packstack.installer.core.parameters.Parameter default value
""" """
param = Parameter() param = parameters.Parameter()
self.assertIsNone(param.PROCESSORS) self.assertIsNone(param.PROCESSORS)
@ -79,7 +79,7 @@ class GroupTestCase(PackstackTestCaseMixin, TestCase):
""" """
Test packstack.installer.core.parameters.Group initialization Test packstack.installer.core.parameters.Group initialization
""" """
group = Group(attributes=self.attrs, parameters=self.params) group = parameters.Group(attributes=self.attrs, parameters=self.params)
for key, value in self.attrs.items(): for key, value in self.attrs.items():
self.assertEqual(getattr(group, key), value) self.assertEqual(getattr(group, key), value)
for param in self.params: for param in self.params:
@ -89,8 +89,8 @@ class GroupTestCase(PackstackTestCaseMixin, TestCase):
""" """
Test packstack.installer.core.parameters.Group search method Test packstack.installer.core.parameters.Group search method
""" """
group = Group(attributes=self.attrs, parameters=self.params) group = parameters.Group(attributes=self.attrs, parameters=self.params)
param_list = group.search('PROMPT', 'find_me') param_list = group.search('PROMPT', 'find_me')
self.assertEqual(len(param_list), 1) self.assertEqual(len(param_list), 1)
self.assertIsInstance(param_list[0], Parameter) self.assertIsInstance(param_list[0], parameters.Parameter)
self.assertEqual(param_list[0].CONF_NAME, 'CONFIG_MARIADB_HOST') self.assertEqual(param_list[0].CONF_NAME, 'CONFIG_MARIADB_HOST')

View File

@ -25,7 +25,7 @@ from unittest import TestCase
from ..test_base import FakePopen from ..test_base import FakePopen
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
from packstack.installer.utils import * from packstack.installer import utils
from packstack.installer.utils.strings import STR_MASK from packstack.installer.utils.strings import STR_MASK
from packstack.installer.exceptions import ExecuteRuntimeError from packstack.installer.exceptions import ExecuteRuntimeError
@ -46,17 +46,17 @@ class ParameterTestCase(PackstackTestCaseMixin, TestCase):
def test_sorteddict(self): def test_sorteddict(self):
"""Test packstack.installer.utils.datastructures.SortedDict.""" """Test packstack.installer.utils.datastructures.SortedDict."""
sdict = SortedDict() sdict = utils.SortedDict()
sdict['1'] = 1 sdict['1'] = 1
sdict['2'] = 2 sdict['2'] = 2
sdict.update(SortedDict([('3', 3), ('4', 4), ('5', 5)])) sdict.update(utils.SortedDict([('3', 3), ('4', 4), ('5', 5)]))
self.assertListEqual(sdict.keys(), ['1', '2', '3', '4', '5']) self.assertListEqual(sdict.keys(), ['1', '2', '3', '4', '5'])
self.assertListEqual(sdict.values(), [1, 2, 3, 4, 5]) self.assertListEqual(sdict.values(), [1, 2, 3, 4, 5])
def test_retry(self): def test_retry(self):
"""Test packstack.installer.utils.decorators.retry.""" """Test packstack.installer.utils.decorators.retry."""
@retry(count=3, delay=0, retry_on=ValueError) @utils.retry(count=3, delay=0, retry_on=ValueError)
def test_sum(): def test_sum():
global cnt global cnt
cnt += 1 cnt += 1
@ -74,44 +74,44 @@ class ParameterTestCase(PackstackTestCaseMixin, TestCase):
def test_network(self): def test_network(self):
"""Test packstack.installer.utils.network functions.""" """Test packstack.installer.utils.network functions."""
self.assertIn(host2ip('localhost', allow_localhost=True), self.assertIn(utils.host2ip('localhost', allow_localhost=True),
['127.0.0.1', '::1']) ['127.0.0.1', '::1'])
def test_shell(self): def test_shell(self):
"""Test packstack.installer.utils.shell functions.""" """Test packstack.installer.utils.shell functions."""
rc, out = execute(['echo', 'this is test']) rc, out = utils.execute(['echo', 'this is test'])
self.assertEqual(out.strip(), 'this is test') self.assertEqual(out.strip(), 'this is test')
rc, out = execute('echo "this is test"', use_shell=True) rc, out = utils.execute('echo "this is test"', use_shell=True)
self.assertEqual(out.strip(), 'this is test') self.assertEqual(out.strip(), 'this is test')
try: try:
execute('echo "mask the password" && exit 1', utils.execute('echo "mask the password" && exit 1',
use_shell=True, mask_list=['password']) use_shell=True, mask_list=['password'])
raise AssertionError('Masked execution failed.') raise AssertionError('Masked execution failed.')
except ExecuteRuntimeError as ex: except ExecuteRuntimeError as ex:
should_be = ('Failed to execute command, stdout: mask the %s\n\n' should_be = ('Failed to execute command, stdout: mask the %s\n\n'
'stderr: ' % STR_MASK) 'stderr: ' % STR_MASK)
self.assertEqual(str(ex), should_be) self.assertEqual(str(ex), should_be)
script = ScriptRunner() script = utils.ScriptRunner()
script.append('echo "this is test"') script.append('echo "this is test"')
rc, out = script.execute() rc, out = script.execute()
self.assertEqual(out.strip(), 'this is test') self.assertEqual(out.strip(), 'this is test')
def test_strings(self): def test_strings(self):
"""Test packstack.installer.utils.strings functions.""" """Test packstack.installer.utils.strings functions."""
self.assertEqual(color_text('test text', 'red'), self.assertEqual(utils.color_text('test text', 'red'),
'\033[0;31mtest text\033[0m') '\033[0;31mtest text\033[0m')
self.assertEqual(mask_string('test text', mask_list=['text']), self.assertEqual(utils.mask_string('test text', mask_list=['text']),
'test %s' % STR_MASK) 'test %s' % STR_MASK)
masked = mask_string("test '\\''text'\\''", masked = utils.mask_string("test '\\''text'\\''",
mask_list=["'text'"], mask_list=["'text'"],
replace_list=[("'", "'\\''")]) replace_list=[("'", "'\\''")])
self.assertEqual(masked, 'test %s' % STR_MASK) self.assertEqual(masked, 'test %s' % STR_MASK)
def test_shortcuts(self): def test_shortcuts(self):
"""Test packstack.installer.utils.shortcuts functions.""" """Test packstack.installer.utils.shortcuts functions."""
conf = {"A_HOST": "1.1.1.1", "B_HOSTS": "2.2.2.2,1.1.1.1", conf = {"A_HOST": "1.1.1.1", "B_HOSTS": "2.2.2.2,1.1.1.1",
"C_HOSTS": "3.3.3.3/vdc"} "C_HOSTS": "3.3.3.3/vdc"}
hostlist = list(hosts(conf)) hostlist = list(utils.hosts(conf))
hostlist.sort() hostlist.sort()
self.assertEqual(['1.1.1.1', '2.2.2.2', '3.3.3.3'], hostlist) self.assertEqual(['1.1.1.1', '2.2.2.2', '3.3.3.3'], hostlist)

View File

@ -19,7 +19,8 @@ import os
import shutil import shutil
import tempfile import tempfile
from unittest import TestCase from unittest import TestCase
from packstack.installer.validators import * from packstack.installer import exceptions
from packstack.installer import validators
from ..test_base import PackstackTestCaseMixin from ..test_base import PackstackTestCaseMixin
@ -34,73 +35,87 @@ class ValidatorsTestCase(PackstackTestCaseMixin, TestCase):
shutil.rmtree(self.tempdir) shutil.rmtree(self.tempdir)
def test_validate_integer(self): def test_validate_integer(self):
"""Test packstack.installer.validators.validate_integer.""" """Test validate_integer."""
validate_integer('1') validators.validate_integer('1')
self.assertRaises(ParamValidationError, validate_integer, 'test') self.assertRaises(exceptions.ParamValidationError,
validators.validate_integer, 'test')
def test_validate_regexp(self): def test_validate_regexp(self):
"""Test packstack.installer.validators.validate_regexp.""" """Test validate_regexp."""
validate_regexp('Test_123', options=[r'\w']) validators.validate_regexp('Test_123', options=[r'\w'])
self.assertRaises(ParamValidationError, validate_regexp, self.assertRaises(exceptions.ParamValidationError,
'!#$%', options=[r'\w']) validators.validate_regexp, '!#$%', options=[r'\w'])
def test_validate_port(self): def test_validate_port(self):
"""Test packstack.installer.validators.validate_port.""" """Test validate_port."""
validate_port('666') validators.validate_port('666')
self.assertRaises(ParamValidationError, validate_port, 'test') self.assertRaises(exceptions.ParamValidationError,
self.assertRaises(ParamValidationError, validate_port, '-3') validators.validate_port, 'test')
self.assertRaises(exceptions.ParamValidationError,
validators.validate_port, '-3')
def test_validate_not_empty(self): def test_validate_not_empty(self):
"""Test packstack.installer.validators.validate_not_empty.""" """Test validate_not_empty."""
validate_not_empty('test') validators.validate_not_empty('test')
validate_not_empty(False) validators.validate_not_empty(False)
self.assertRaises(ParamValidationError, validate_not_empty, '') self.assertRaises(exceptions.ParamValidationError,
self.assertRaises(ParamValidationError, validate_not_empty, []) validators.validate_not_empty, '')
self.assertRaises(ParamValidationError, validate_not_empty, {}) self.assertRaises(exceptions.ParamValidationError,
validators.validate_not_empty, [])
self.assertRaises(exceptions.ParamValidationError,
validators.validate_not_empty, {})
def test_validate_options(self): def test_validate_options(self):
"""Test packstack.installer.validators.validate_options.""" """Test validate_options."""
validate_options('a', options=['a', 'b']) validators.validate_options('a', options=['a', 'b'])
validate_options('b', options=['a', 'b']) validators.validate_options('b', options=['a', 'b'])
self.assertRaises(ParamValidationError, validate_options, self.assertRaises(
'c', options=['a', 'b']) exceptions.ParamValidationError,
validators.validate_options, 'c', options=['a', 'b'])
def test_validate_ip(self): def test_validate_ip(self):
"""Test packstack.installer.validators.validate_ip.""" """Test validate_ip."""
validate_ip('127.0.0.1') validators.validate_ip('127.0.0.1')
validate_ip('::1') validators.validate_ip('::1')
self.assertRaises(ParamValidationError, validate_ip, 'test') self.assertRaises(exceptions.ParamValidationError,
validators.validate_ip, 'test')
def test_validate_file(self): def test_validate_file(self):
"""Test packstack.installer.validators.validate_file.""" """Test validate_file."""
dname = os.path.join(self.tempdir, '.test_validate_file') dname = os.path.join(self.tempdir, '.test_validate_file')
bad_name = os.path.join(self.tempdir, '.me_no/exists') bad_name = os.path.join(self.tempdir, '.me_no/exists')
os.mkdir(dname) os.mkdir(dname)
validate_writeable_directory(dname) validators.validate_writeable_directory(dname)
self.assertRaises(ParamValidationError, validate_writeable_directory, bad_name) self.assertRaises(
exceptions.ParamValidationError,
validators.validate_writeable_directory, bad_name)
def test_validate_writeable_directory(self): def test_validate_writeable_directory(self):
"""Test packstack.installer.validators.validate_writeable_directory.""" """Test validate_writeable_directory."""
fname = os.path.join(self.tempdir, '.test_validate_writeable_directory') fname = os.path.join(
self.tempdir, '.test_validate_writeable_directory')
bad_name = os.path.join(self.tempdir, '.me_no_exists') bad_name = os.path.join(self.tempdir, '.me_no_exists')
with open(fname, 'w') as f: with open(fname, 'w') as f:
f.write('test') f.write('test')
validate_file(fname) validators.validate_file(fname)
self.assertRaises(ParamValidationError, validate_file, bad_name) self.assertRaises(
exceptions.ParamValidationError,
validators.validate_file, bad_name)
def test_validate_ping(self): def test_validate_ping(self):
"""Test packstack.installer.validators.validate_ping.""" """Test validate_ping."""
# ping to broadcast fails # ping to broadcast fails
self.assertRaises(ParamValidationError, validate_ping, self.assertRaises(exceptions.ParamValidationError,
'255.255.255.255') validators.validate_ping, '255.255.255.255')
def test_validate_ssh(self): def test_validate_ssh(self):
"""Test packstack.installer.validators.validate_ssh.""" """Test validate_ssh."""
# ssh to broadcast fails # ssh to broadcast fails
self.assertRaises(ParamValidationError, validate_ssh, self.assertRaises(exceptions.ParamValidationError,
'255.255.255.255') validators.validate_ssh, '255.255.255.255')
def test_validate_float(self): def test_validate_float(self):
"""Test packstack.installer.validators.validate_float.""" """Test validate_float."""
validate_float('5.3') validators.validate_float('5.3')
self.assertRaises(ParamValidationError, validate_float, 'test') self.assertRaises(exceptions.ParamValidationError,
validators.validate_float, 'test')

View File

@ -15,10 +15,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import shutil
import tempfile
import subprocess
import logging import logging
import shutil
import subprocess
import tempfile
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -122,8 +122,7 @@ class PackstackTestCaseMixin(object):
'Second has %s' % (f, s)) 'Second has %s' % (f, s))
self.assertEqual(f, s, msg=_msg) self.assertEqual(f, s, msg=_msg)
_msg = msg or ('Given lists differ:\n%(list1)s' _msg = msg or f'Given lists differ:\n{list1}\n{list2}'
'\n%(list2)s' % locals())
for i in list1: for i in list1:
if i not in list2: if i not in list2:
raise AssertionError(_msg) raise AssertionError(_msg)
@ -134,8 +133,7 @@ class PackstackTestCaseMixin(object):
'Second has %s' % (f, s)) 'Second has %s' % (f, s))
self.assertEqual(f, s, msg=_msg) self.assertEqual(f, s, msg=_msg)
_msg = msg or ('Given lists differ:\n%(list1)s' _msg = msg or f'Given lists differ:\n{list1}\n{list2}'
'\n%(list2)s' % locals())
for index, item in enumerate(list1): for index, item in enumerate(list1):
if item != list2[index]: if item != list2[index]:
raise AssertionError(_msg) raise AssertionError(_msg)

View File

@ -42,6 +42,6 @@ commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasen
# E123, E125 skipped as they are invalid PEP-8. # E123, E125 skipped as they are invalid PEP-8.
# #
# All other checks should be enabled in the future. # All other checks should be enabled in the future.
ignore = E123,E125,H803,F403,F405,F821,F811,F841,E501,H302,H303,H304,H306,H405,H404,H305,H307,H501,H201,H101,W503,W504 ignore = E123,E125,E501,H404,H405,H305,H306,H307,W504
show-source = True show-source = True
exclude=.venv,.git,.tox,.eggs exclude=.venv,.git,.tox,.eggs