Refactor libvirt autodection and use oslo_config
List of changes: * using oslo_config to get nova configuration * adjusted _detect body to match changes done recently for other plugins Extra: * added utility method to to load oslo_configuration for any OpenStack project using it. * removed json, time from required dependencies (core python libs) * removed libvirt inspector from required dependencies (part of an agent itself) * removed netaddr from required dependencies (part of agent's requirements) * in overall tried to introduce some order into libvirt code Story: 2000999 Task: 4623 Story: 2001054 Task: 4655 Change-Id: Iaac56cf96f710659908d23dc55831be7dac30e0a
This commit is contained in:
parent
77d4ed080e
commit
f06611573a
@ -63,7 +63,8 @@ class Plugin(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def literal_eval(self, testval):
|
||||
@staticmethod
|
||||
def literal_eval(testval):
|
||||
"""Return a literal boolean value if applicable
|
||||
|
||||
"""
|
||||
|
@ -1,7 +1,6 @@
|
||||
# (c) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
||||
# Copyright 2017 Fujitsu LIMITED
|
||||
|
||||
import ConfigParser
|
||||
import grp
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
@ -9,9 +8,13 @@ from shutil import copy
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
|
||||
from monasca_agent.common.psutil_wrapper import psutil
|
||||
import monasca_setup.agent_config
|
||||
from monasca_setup.detection import Plugin
|
||||
from monasca_setup import agent_config
|
||||
from monasca_setup.detection import plugin
|
||||
from monasca_setup.detection import utils
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -47,157 +50,195 @@ default_vnic_collection_period = 0
|
||||
INT_ARGS = ['disk_collection_period', 'vnic_collection_period',
|
||||
'max_ping_concurrency', 'nova_refresh', 'vm_probation']
|
||||
|
||||
_REQUIRED_OPTS = [
|
||||
{'opt': cfg.StrOpt('username'), 'group': 'keystone_authtoken'},
|
||||
{'opt': cfg.StrOpt('password'), 'group': 'keystone_authtoken'},
|
||||
{'opt': cfg.StrOpt('project_name'), 'group': 'keystone_authtoken'},
|
||||
{'opt': cfg.StrOpt('auth_url'), 'group': 'keystone_authtoken'}
|
||||
]
|
||||
"""Nova configuration opts required by this plugin"""
|
||||
|
||||
class Libvirt(Plugin):
|
||||
|
||||
class Libvirt(plugin.Plugin):
|
||||
"""Configures VM monitoring through Nova"""
|
||||
|
||||
FAILED_DETECTION_MSG = 'libvirt plugin will not not be configured.'
|
||||
|
||||
def _detect(self):
|
||||
"""Set self.available True if the process and config file are detected
|
||||
"""
|
||||
# Detect Agent's OS username by getting the group owner of config file
|
||||
try:
|
||||
gid = os.stat('/etc/monasca/agent/agent.yaml').st_gid
|
||||
self.agent_user = grp.getgrgid(gid)[0]
|
||||
except OSError:
|
||||
self.agent_user = None
|
||||
# Try to detect the location of the Nova configuration file.
|
||||
# Walk through the list of processes, searching for 'nova-compute'
|
||||
# process with 'nova.conf' inside one of the parameters
|
||||
nova_conf = None
|
||||
for proc in psutil.process_iter():
|
||||
try:
|
||||
cmd = proc.as_dict(['cmdline'])['cmdline']
|
||||
if len(cmd) > 2 and 'python' in cmd[0] and 'nova-compute' in cmd[1]:
|
||||
conf_indexes = [cmd.index(y) for y in cmd if 'nova.conf' in y]
|
||||
if not conf_indexes:
|
||||
if os.path.exists('/etc/nova/nova.conf'):
|
||||
nova_conf = "/etc/nova/nova.conf"
|
||||
continue
|
||||
param = conf_indexes[0]
|
||||
if '=' in cmd[param]:
|
||||
nova_conf = cmd[param].split('=')[1]
|
||||
else:
|
||||
nova_conf = cmd[param]
|
||||
except (IOError, psutil.NoSuchProcess):
|
||||
# Process has already terminated, ignore
|
||||
continue
|
||||
if (nova_conf is not None and os.path.isfile(nova_conf)):
|
||||
self.available = True
|
||||
|
||||
# NOTE(trebskit) bind each check we execute to another one
|
||||
# that way if X-one fails following won't be executed
|
||||
# and detection phase will end faster
|
||||
nova_proc = utils.find_process_name('nova-compute')
|
||||
has_deps = self.dependencies_installed() if nova_proc else None
|
||||
nova_conf = self._find_nova_conf(nova_proc) if has_deps else None
|
||||
has_cache_dir = self._has_cache_dir() if nova_conf else None
|
||||
agent_user = self._get_agent_username() if has_cache_dir else None
|
||||
|
||||
self.available = nova_conf and has_cache_dir
|
||||
if not self.available:
|
||||
if not nova_proc:
|
||||
detailed_message = '\tnova-compute process not found.'
|
||||
log.info('%s\n%s' % (detailed_message,
|
||||
self.FAILED_DETECTION_MSG))
|
||||
elif not has_deps:
|
||||
detailed_message = ('\tRequired dependencies were not found.\n'
|
||||
'Run pip install monasca-agent[libvirt] '
|
||||
'to install all dependencies.')
|
||||
log.warning('%s\n%s' % (detailed_message,
|
||||
self.FAILED_DETECTION_MSG))
|
||||
elif not has_cache_dir:
|
||||
detailed_message = '\tCache directory %s not found' % cache_dir
|
||||
log.warning('%s\n%s' % (detailed_message,
|
||||
self.FAILED_DETECTION_MSG))
|
||||
elif not nova_conf:
|
||||
detailed_message = ('\tnova-compute process was found, '
|
||||
'but it was impossible to '
|
||||
'read it\'s configuration.')
|
||||
log.warning('%s\n%s' % (detailed_message,
|
||||
self.FAILED_DETECTION_MSG))
|
||||
else:
|
||||
self.nova_conf = nova_conf
|
||||
self._agent_user = agent_user
|
||||
|
||||
def build_config(self):
|
||||
"""Build the config as a Plugins object and return back.
|
||||
"""
|
||||
config = monasca_setup.agent_config.Plugins()
|
||||
config = agent_config.Plugins()
|
||||
init_config = self._get_init_config()
|
||||
|
||||
if self.dependencies_installed():
|
||||
nova_cfg = ConfigParser.SafeConfigParser()
|
||||
log.info("\tUsing nova configuration file {0}".format(self.nova_conf))
|
||||
nova_cfg.read(self.nova_conf)
|
||||
# Which configuration options are needed for the plugin YAML?
|
||||
# Use a dict so that they can be renamed later if necessary
|
||||
cfg_needed = {'username': 'username',
|
||||
'password': 'password',
|
||||
'project_name': 'project_name'}
|
||||
cfg_section = 'keystone_authtoken'
|
||||
self._configure_ping(init_config)
|
||||
|
||||
# Start with plugin-specific configuration parameters
|
||||
init_config = {'cache_dir': cache_dir,
|
||||
'nova_refresh': nova_refresh,
|
||||
'vm_probation': vm_probation,
|
||||
'metadata': metadata,
|
||||
'customer_metadata': customer_metadata,
|
||||
'max_ping_concurrency': default_max_ping_concurrency,
|
||||
'disk_collection_period': default_disk_collection_period,
|
||||
'vnic_collection_period': default_vnic_collection_period}
|
||||
# Handle monasca-setup detection arguments, which take precedence
|
||||
if self.args:
|
||||
for arg in self.args:
|
||||
if arg in INT_ARGS:
|
||||
value = self.args[arg]
|
||||
try:
|
||||
init_config[arg] = int(value)
|
||||
except ValueError:
|
||||
log.warn("\tInvalid integer value '{0}' for parameter {1}, ignoring value"
|
||||
.format(value, arg))
|
||||
else:
|
||||
init_config[arg] = self.literal_eval(self.args[arg])
|
||||
|
||||
# Set default parameters for included checks
|
||||
init_config['vm_cpu_check_enable'] = self.literal_eval('True')
|
||||
init_config['vm_disks_check_enable'] = self.literal_eval('True')
|
||||
init_config['vm_network_check_enable'] = self.literal_eval('True')
|
||||
init_config['vm_ping_check_enable'] = self.literal_eval('True')
|
||||
init_config['vm_extended_disks_check_enable'] = self.literal_eval('False')
|
||||
|
||||
for option in cfg_needed:
|
||||
init_config[option] = nova_cfg.get(cfg_section, cfg_needed[option])
|
||||
|
||||
# Create an identity URI (again, slightly different for Devstack)
|
||||
if nova_cfg.has_option(cfg_section, 'auth_url'):
|
||||
init_config['auth_url'] = nova_cfg.get(cfg_section, 'auth_url')
|
||||
else:
|
||||
init_config['auth_url'] = nova_cfg.get(cfg_section, 'identity_uri')
|
||||
|
||||
# Verify requirements to enable ping checks
|
||||
init_config['ping_check'] = self.literal_eval('False')
|
||||
if self.agent_user is None:
|
||||
log.warn("\tUnable to determine agent user. Skipping ping checks.")
|
||||
else:
|
||||
try:
|
||||
from neutronclient.v2_0 import client
|
||||
|
||||
# Copy system 'ip' command to local directory
|
||||
copy(ip_cmd, sys.path[0])
|
||||
# Restrict permissions on the local 'ip' command
|
||||
os.chown("{0}/ip".format(sys.path[0]), pwd.getpwnam(self.agent_user).pw_uid, 0)
|
||||
os.chmod("{0}/ip".format(sys.path[0]), 0o700)
|
||||
# Set capabilities on 'ip' which will allow
|
||||
# self.agent_user to exec commands in namespaces
|
||||
setcap_cmd = ['/sbin/setcap', 'cap_sys_admin+ep',
|
||||
"{0}/ip".format(sys.path[0])]
|
||||
subprocess.Popen(setcap_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
# Verify that the capabilities were set
|
||||
setcap_cmd.extend(['-v', '-q'])
|
||||
subprocess.check_call(setcap_cmd)
|
||||
# Look for the best ping command
|
||||
for ping_cmd in ping_options:
|
||||
if os.path.isfile(ping_cmd[0]):
|
||||
init_config['ping_check'] = "{0}/ip netns exec NAMESPACE {1}".format(sys.path[0],
|
||||
' '.join(ping_cmd))
|
||||
log.info("\tEnabling ping checks using {0}".format(ping_cmd[0]))
|
||||
break
|
||||
if init_config['ping_check'] is False:
|
||||
log.warn("\tUnable to find suitable ping command, disabling ping checks.")
|
||||
except ImportError:
|
||||
log.warn("\tneutronclient module missing, required for ping checks.")
|
||||
pass
|
||||
except IOError:
|
||||
log.warn("\tUnable to copy {0}, ping checks disabled.".format(ip_cmd))
|
||||
pass
|
||||
except (subprocess.CalledProcessError, OSError):
|
||||
log.warn("\tUnable to set up ping checks, setcap failed ({0})".format(' '.join(setcap_cmd)))
|
||||
pass
|
||||
|
||||
# Handle monasca-setup detection arguments, which take precedence
|
||||
if self.args:
|
||||
for arg in self.args:
|
||||
if arg in INT_ARGS:
|
||||
value = self.args[arg]
|
||||
try:
|
||||
init_config[arg] = int(value)
|
||||
except ValueError:
|
||||
log.warn("\tInvalid integer value '{0}' for parameter {1}, ignoring value"
|
||||
.format(value, arg))
|
||||
else:
|
||||
init_config[arg] = self.literal_eval(self.args[arg])
|
||||
|
||||
config['libvirt'] = {'init_config': init_config,
|
||||
'instances': []}
|
||||
config['libvirt'] = {'init_config': init_config,
|
||||
'instances': []}
|
||||
|
||||
return config
|
||||
|
||||
def dependencies_installed(self):
|
||||
try:
|
||||
import json
|
||||
import monasca_agent.collector.virt.inspector
|
||||
import time
|
||||
def _configure_ping(self, init_config):
|
||||
if self._agent_user is None:
|
||||
log.warn("\tUnable to determine agent user. Skipping ping checks.")
|
||||
return
|
||||
|
||||
from netaddr import all_matching_cidrs
|
||||
from novaclient import client
|
||||
except ImportError:
|
||||
log.warn("\tDependencies not satisfied; plugin not configured.")
|
||||
return False
|
||||
if os.path.isdir(cache_dir) is False:
|
||||
log.warn("\tCache directory {} not found;" +
|
||||
" plugin not configured.".format(cache_dir))
|
||||
return False
|
||||
return True
|
||||
try:
|
||||
client = importutils.try_import('neutronclient.v2_0.client',
|
||||
False)
|
||||
if not client:
|
||||
log.warning(
|
||||
'\tpython-neutronclient module missing, '
|
||||
'required for ping checks.')
|
||||
return
|
||||
|
||||
# Copy system 'ip' command to local directory
|
||||
copy(ip_cmd, sys.path[0])
|
||||
|
||||
# Restrict permissions on the local 'ip' command
|
||||
os.chown("{0}/ip".format(sys.path[0]),
|
||||
*self._get_user_uid_gid(self._agent_user))
|
||||
os.chmod("{0}/ip".format(sys.path[0]),
|
||||
0o700)
|
||||
|
||||
# Set capabilities on 'ip' which will allow
|
||||
# self.agent_user to exec commands in namespaces
|
||||
setcap_cmd = ['/sbin/setcap', 'cap_sys_admin+ep',
|
||||
"{0}/ip".format(sys.path[0])]
|
||||
subprocess.Popen(setcap_cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
# Verify that the capabilities were set
|
||||
setcap_cmd.extend(['-v', '-q'])
|
||||
subprocess.check_call(setcap_cmd)
|
||||
# Look for the best ping command
|
||||
for ping_cmd in ping_options:
|
||||
if os.path.isfile(ping_cmd[0]):
|
||||
init_config[
|
||||
'ping_check'] = "{0}/ip netns exec NAMESPACE {1}".format(
|
||||
sys.path[0],
|
||||
' '.join(ping_cmd))
|
||||
log.info(
|
||||
"\tEnabling ping checks using {0}".format(ping_cmd[0]))
|
||||
init_config['ping_check'] = True
|
||||
break
|
||||
if init_config['ping_check'] is False:
|
||||
log.warn('\tUnable to find suitable ping command, '
|
||||
'disabling ping checks.')
|
||||
except IOError:
|
||||
log.warn('\tUnable to copy {0}, '
|
||||
'ping checks disabled.'.format(ip_cmd))
|
||||
pass
|
||||
except (subprocess.CalledProcessError, OSError):
|
||||
log.warn('\tUnable to set up ping checks, '
|
||||
'setcap failed ({0})'.format(' '.join(setcap_cmd)))
|
||||
pass
|
||||
|
||||
def dependencies_installed(self):
|
||||
return importutils.try_import('novaclient.client', False)
|
||||
|
||||
def _get_init_config(self):
|
||||
keystone_auth_section = self.nova_conf['keystone_authtoken']
|
||||
init_config = {
|
||||
'cache_dir': cache_dir,
|
||||
'nova_refresh': nova_refresh,
|
||||
'metadata': metadata,
|
||||
'vm_probation': vm_probation,
|
||||
'customer_metadata': customer_metadata,
|
||||
'max_ping_concurrency': default_max_ping_concurrency,
|
||||
'disk_collection_period': default_disk_collection_period,
|
||||
'vnic_collection_period': default_vnic_collection_period,
|
||||
'vm_cpu_check_enable': True,
|
||||
'vm_disks_check_enable': True,
|
||||
'vm_network_check_enable': True,
|
||||
'vm_ping_check_enable': True,
|
||||
'vm_extended_disks_check_enable': False,
|
||||
'ping_check': False,
|
||||
'username': keystone_auth_section['username'],
|
||||
'password': keystone_auth_section['password'],
|
||||
'project_name': keystone_auth_section['project_name'],
|
||||
'auth_url': keystone_auth_section['auth_url']
|
||||
}
|
||||
return init_config
|
||||
|
||||
@staticmethod
|
||||
def _has_cache_dir():
|
||||
return os.path.isdir(cache_dir)
|
||||
|
||||
@staticmethod
|
||||
def _get_agent_username():
|
||||
agent_user = None
|
||||
try:
|
||||
uid = os.stat('/etc/monasca/agent/agent.yaml').st_uid
|
||||
agent_user = pwd.getpwuid(uid).pw_name
|
||||
except OSError:
|
||||
log.exception('Failed to retrieve agent\'s username')
|
||||
return agent_user
|
||||
|
||||
@staticmethod
|
||||
def _find_nova_conf(nova_process):
|
||||
try:
|
||||
nova_cmd = nova_process.as_dict(['cmdline'])['cmdline']
|
||||
return utils.load_oslo_configuration(from_cmd=nova_cmd,
|
||||
in_project='nova',
|
||||
for_opts=_REQUIRED_OPTS)
|
||||
except cfg.Error:
|
||||
log.exception('Failed to load nova configuration')
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_user_uid_gid(username):
|
||||
stat = pwd.getpwnam(username)
|
||||
uid = stat.pw_uid
|
||||
gid = stat.pw_gid
|
||||
return uid, gid
|
||||
|
@ -7,6 +7,8 @@ from subprocess import CalledProcessError
|
||||
from subprocess import PIPE
|
||||
from subprocess import Popen
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from monasca_agent.common.psutil_wrapper import psutil
|
||||
from monasca_setup import agent_config
|
||||
|
||||
@ -89,6 +91,66 @@ def find_addr_listening_on_port_over_tcp(port):
|
||||
return ip[0].lstrip("::ffff:")
|
||||
|
||||
|
||||
# NOTE(trebskit) a little poetry never hurt anyone before...right ?
|
||||
def load_oslo_configuration(from_cmd, in_project,
|
||||
for_opts, of_prog=None):
|
||||
"""Loads configuration of an OpenStack project.
|
||||
|
||||
for_opts should be a :py:class:`list` containing dictionaries
|
||||
with keys as expected by :py:class:meth:`cfg.ConfigOpts.register_opt`::
|
||||
|
||||
>>> for_opts = [
|
||||
>>> {'opt': cfg.StrOpt('region_name')},
|
||||
>>> {'opt': cfg.StrOpt('username'), 'group': 'keystoneauth'},
|
||||
>>> {'opt': cfg.StrOpt('password'), 'group': 'keystoneauth'},
|
||||
>>> ]
|
||||
|
||||
Example::
|
||||
|
||||
>>> nova_proc = find_process_name('nova-compute')
|
||||
>>> proc_cmd = nova_proc.as_dict(['cmdline'])['cmdline']
|
||||
>>> load_oslo_configuration(
|
||||
>>> from_cmd=proc_cmd,
|
||||
>>> in_project='nova',
|
||||
>>> for_opts=for_opts
|
||||
>>> )
|
||||
|
||||
which will load three [region_name, username and password] settings from
|
||||
Nova configuration regardless of where those
|
||||
settings are actually defined.
|
||||
|
||||
:param from_cmd: cmdline of a process, used also to retrieve arguments
|
||||
:type from_cmd: list[basestring]
|
||||
:param in_project: the project name as defined in its oslo setup
|
||||
:type in_project: basestring
|
||||
:param for_opts: list of dict containing options to look for inside config
|
||||
:type for_opts: list[dict]
|
||||
:param of_prog: program name within the project [optional]
|
||||
:type of_prog: basestring
|
||||
:return: oslo configuration object
|
||||
:rtype: oslo_config.cfg.CONF
|
||||
"""
|
||||
|
||||
conf_holder = cfg.ConfigOpts()
|
||||
for no in for_opts:
|
||||
conf_holder.register_opt(**no)
|
||||
|
||||
# NOTE(trebskit) we need to remove everything from the beginning
|
||||
# of the cmd arg list that is not an argument of the application
|
||||
# we want to get configuration from, i.e.;
|
||||
# /usr/bin/python, /usr/bin/python3
|
||||
# and next actual binary of the program
|
||||
# /usr/local/bin/nova-compute
|
||||
args = from_cmd[2:]
|
||||
conf_holder(
|
||||
args=args,
|
||||
project=in_project,
|
||||
prog=of_prog
|
||||
)
|
||||
|
||||
return conf_holder
|
||||
|
||||
|
||||
def watch_process(search_strings, service=None, component=None,
|
||||
exact_match=True, detailed=True, process_name=None, dimensions=None):
|
||||
"""Takes a list of process search strings and returns a Plugins object with the config set.
|
||||
|
@ -43,12 +43,16 @@ monasca_agent.collector.virt =
|
||||
# list of extra dependencies that are required by some plugin
|
||||
# for details see #PEP0426
|
||||
[extras]
|
||||
d_influxdb =
|
||||
influxdb =
|
||||
toml
|
||||
d_influxdb_relay =
|
||||
influxdb_relay =
|
||||
toml
|
||||
kafka_plugin =
|
||||
monasca-common >= 1.4.0
|
||||
libvirt =
|
||||
libvirt-python>=1.2.5
|
||||
python-novaclient>=7.1.0
|
||||
python-neutronclient>=6.2.0,!=6.3.0
|
||||
|
||||
[global]
|
||||
setup-hooks =
|
||||
|
125
tests/detection/test_utils.py
Normal file
125
tests/detection/test_utils.py
Normal file
@ -0,0 +1,125 @@
|
||||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
|
||||
from monasca_setup.detection import utils
|
||||
|
||||
|
||||
class TestDetectionUtilsOsloConf(base.BaseTestCase):
|
||||
PROJECT = 'foo'
|
||||
PROG = 'bar'
|
||||
|
||||
@mock.patch('monasca_setup.detection.utils.cfg.ConfigOpts')
|
||||
def test_load_oslo_configuration_no_args(self, config_opts):
|
||||
config_opts.return_value = co = mock.Mock()
|
||||
opts = [
|
||||
{'opt': cfg.StrOpt('region_name')}
|
||||
]
|
||||
args = ['python', 'foo-api']
|
||||
|
||||
self._run_load_oslo_test(co, opts, args)
|
||||
|
||||
@mock.patch('monasca_setup.detection.utils.cfg.ConfigOpts')
|
||||
def test_load_oslo_configuration_with_args(self, config_opts):
|
||||
config_opts.return_value = co = mock.Mock()
|
||||
opts = [
|
||||
{'opt': cfg.StrOpt('region_name')}
|
||||
]
|
||||
args = ['python', 'foo-api', '--config-dir', '/foo/bar',
|
||||
'--config-dir', '/tmp/foo']
|
||||
|
||||
self._run_load_oslo_test(co, opts, args)
|
||||
|
||||
def test_should_create_new_oslo_conf_for_each_call(self):
|
||||
# test ensures that each call for load_oslo_configuration
|
||||
# creates new object of oslo_config.ConfigOpts
|
||||
cfg_1 = utils.load_oslo_configuration(
|
||||
from_cmd=[],
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=[]
|
||||
)
|
||||
cfg_2 = utils.load_oslo_configuration(
|
||||
from_cmd=[],
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=[]
|
||||
)
|
||||
|
||||
self.assertIsNot(cfg_1, cfg_2)
|
||||
|
||||
def test_distinct_oslo_confs_should_contain_different_opts(self):
|
||||
# test ensures that each instance created via load_oslo_configuration
|
||||
# contains different values of the same opts
|
||||
|
||||
cmd_1 = ['python', 'test', '--foo', '1']
|
||||
cmd_2 = ['python', 'test', '--foo', '2']
|
||||
|
||||
opts = [
|
||||
{
|
||||
'opt': cfg.IntOpt(name='foo', default=-1),
|
||||
'cli': True
|
||||
}
|
||||
]
|
||||
|
||||
cfg_1 = utils.load_oslo_configuration(
|
||||
from_cmd=cmd_1,
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=opts
|
||||
)
|
||||
cfg_2 = utils.load_oslo_configuration(
|
||||
from_cmd=cmd_2,
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=opts
|
||||
)
|
||||
cfg_3 = utils.load_oslo_configuration(
|
||||
from_cmd=[],
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=opts
|
||||
)
|
||||
|
||||
self.assertIsNot(cfg_1, cfg_2)
|
||||
self.assertIsNot(cfg_2, cfg_3)
|
||||
self.assertIsNot(cfg_1, cfg_3)
|
||||
|
||||
self.assertNotEqual(cfg_1.foo, cfg_2.foo)
|
||||
self.assertNotEqual(cfg_2.foo, cfg_3.foo)
|
||||
self.assertNotEqual(cfg_1.foo, cfg_3.foo)
|
||||
|
||||
def _run_load_oslo_test(self, config_opts, opts, args):
|
||||
actual_args = args[2:]
|
||||
|
||||
conf = utils.load_oslo_configuration(
|
||||
from_cmd=args,
|
||||
in_project=self.PROJECT,
|
||||
of_prog=self.PROG,
|
||||
for_opts=opts
|
||||
)
|
||||
|
||||
self.assertIsNotNone(conf)
|
||||
|
||||
for opt in opts:
|
||||
config_opts.register_opt.assert_called_once_with(**opt)
|
||||
config_opts.assert_called_once_with(
|
||||
args=actual_args,
|
||||
project=self.PROJECT,
|
||||
prog=self.PROG
|
||||
)
|
Loading…
Reference in New Issue
Block a user