Files
python-ironicclient/ironicclient/tests/functional/base.py
Dmitry Tantsur cdae0fb045 Log warning when API version is not specified for the ironic tool
At the Pike PTG, an issue was brought up regarding the use of an old API
version in the ironic tool [0]. It was suggested that we begin printing
a warning whenever the default API version is used and later default
to using the latest API version when --ironic-api-version is unspecified.

This patch adds this warning.

[0] https://etherpad.openstack.org/p/ironic-pike-ptg-operations L30

Change-Id: I80d553e4d3b007d8312931019037f495425b5ea5
Partial-Bug: #1671145
2017-07-21 10:50:17 +02:00

446 lines
18 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import six
import six.moves.configparser as config_parser
from tempest.lib.cli import base
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
import ironicclient.tests.functional.utils as utils
DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
class FunctionalTestBase(base.ClientTestBase):
"""Ironic base class, calls to ironicclient."""
def setUp(self):
super(FunctionalTestBase, self).setUp()
self.client = self._get_clients()
# NOTE(kromanenko) set ironic api version for portgroups
self.pg_api_ver = '--ironic-api-version 1.25'
def _get_clients(self):
# NOTE(aarefiev): {toxinidir} is a current working directory, so
# the tox env path is {toxinidir}/.tox
cli_dir = os.path.join(os.path.abspath('.'), '.tox/functional/bin')
config = self._get_config()
if config.get('os_auth_url'):
client = base.CLIClient(cli_dir=cli_dir,
username=config['os_username'],
password=config['os_password'],
tenant_name=config['os_project_name'],
uri=config['os_auth_url'])
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if config.get(domain_attr):
setattr(self, domain_attr, config[domain_attr])
else:
self.ironic_url = config['ironic_url']
self.os_auth_token = config['os_auth_token']
client = base.CLIClient(cli_dir=cli_dir,
ironic_url=self.ironic_url,
os_auth_token=self.os_auth_token)
return client
def _get_config(self):
config_file = os.environ.get('IRONICCLIENT_TEST_CONFIG',
DEFAULT_CONFIG_FILE)
# SafeConfigParser was deprecated in Python 3.2
if six.PY3:
config = config_parser.ConfigParser()
else:
config = config_parser.SafeConfigParser()
if not config.read(config_file):
self.skipTest('Skipping, no test config found @ %s' % config_file)
try:
auth_strategy = config.get('functional', 'auth_strategy')
except config_parser.NoOptionError:
auth_strategy = 'keystone'
if auth_strategy not in ['keystone', 'noauth']:
raise self.fail(
'Invalid auth type specified: %s in functional must be '
'one of: [keystone, noauth]' % auth_strategy)
conf_settings = []
keystone_v3_conf_settings = []
if auth_strategy == 'keystone':
conf_settings += ['os_auth_url', 'os_username',
'os_password', 'os_project_name']
keystone_v3_conf_settings += ['os_user_domain_id',
'os_project_domain_id']
else:
conf_settings += ['os_auth_token', 'ironic_url']
cli_flags = {}
missing = []
for c in conf_settings + keystone_v3_conf_settings:
try:
cli_flags[c] = config.get('functional', c)
except config_parser.NoOptionError:
# NOTE(vdrok): Here we ignore the absence of KS v3 options as
# v2 may be used. Keystone client will do the actual check of
# the parameters' correctness.
if c not in keystone_v3_conf_settings:
missing.append(c)
if missing:
self.fail('Missing required setting in test.conf (%(conf)s) for '
'auth_strategy=%(auth)s: %(missing)s' %
{'conf': config_file,
'auth': auth_strategy,
'missing': ','.join(missing)})
return cli_flags
def _cmd_no_auth(self, cmd, action, flags='', params=''):
"""Execute given command with noauth attributes.
:param cmd: command to be executed
:type cmd: string
:param action: command on cli to run
:type action: string
:param flags: optional cli flags to use
:type flags: string
:param params: optional positional args to use
:type params: string
"""
flags = ('--os_auth_token %(token)s --ironic_url %(url)s %(flags)s'
%
{'token': self.os_auth_token,
'url': self.ironic_url,
'flags': flags})
return base.execute(cmd, action, flags, params,
cli_dir=self.client.cli_dir)
def _ironic(self, action, flags='', params='', merge_stderr=False):
"""Execute ironic command for the given action.
:param action: the cli command to run using Ironic
:type action: string
:param flags: any optional cli flags to use
:type flags: string
:param params: any optional positional args to use
:type params: string
:param merge_stderr: whether to merge stderr into the result
:type merge_stderr: bool
"""
flags += ' --os-endpoint-type publicURL'
if hasattr(self, 'os_auth_token'):
return self._cmd_no_auth('ironic', action, flags, params)
else:
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if hasattr(self, domain_attr):
flags += ' --os-%(ks_obj)s-domain-id %(value)s' % {
'ks_obj': keystone_object,
'value': getattr(self, domain_attr)
}
return self.client.cmd_with_auth('ironic',
action, flags, params,
merge_stderr=merge_stderr)
def _ironic_osc(self, action, flags='', params='', merge_stderr=False):
"""Execute baremetal commands via OpenStack Client."""
config = self._get_config()
id_api_version = config.get('functional', 'os_identity_api_version')
flags += ' --os-identity-api-version {0}'.format(id_api_version)
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if hasattr(self, domain_attr):
flags += ' --os-%(ks_obj)s-domain-id %(value)s' % {
'ks_obj': keystone_object,
'value': getattr(self, domain_attr)
}
return self.client.cmd_with_auth(
'openstack', action, flags, params, merge_stderr=merge_stderr)
def ironic(self, action, flags='', params='', parse=True):
"""Return parsed list of dicts with basic item info.
:param action: the cli command to run using Ironic
:type action: string
:param flags: any optional cli flags to use
:type flags: string
:param params: any optional positional args to use
:type params: string
:param parse: return parsed list or raw output
:type parse: bool
"""
output = self._ironic(action=action, flags=flags, params=params)
return self.parser.listing(output) if parse else output
def get_table_headers(self, action, flags='', params=''):
output = self._ironic(action=action, flags=flags, params=params)
table = self.parser.table(output)
return table['headers']
def assertTableHeaders(self, field_names, table_headers):
"""Assert that field_names and table_headers are equal.
:param field_names: field names from the output table of the cmd
:param table_headers: table headers output from cmd
"""
self.assertEqual(sorted(field_names), sorted(table_headers))
def assertNodeStates(self, node_show, node_show_states):
"""Assert that node_show_states output corresponds to node_show output.
:param node_show: output from node-show cmd
:param node_show_states: output from node-show-states cmd
"""
for key in node_show_states.keys():
self.assertEqual(node_show_states[key], node_show[key])
def assertNodeValidate(self, node_validate):
"""Assert that all interfaces present are valid.
:param node_validate: output from node-validate cmd
"""
self.assertNotIn('False', [x['Result'] for x in node_validate])
def delete_node(self, node_id):
"""Delete node method works only with fake driver.
:param node_id: node uuid
:raises: CommandFailed exception when command fails to delete a node
"""
node_list = self.list_nodes()
if utils.get_object(node_list, node_id):
node_show = self.show_node(node_id)
if node_show['provision_state'] != 'available':
self.ironic('node-set-provision-state',
params='{0} deleted'.format(node_id))
if node_show['power_state'] not in ('None', 'off'):
self.ironic('node-set-power-state',
params='{0} off'.format(node_id))
self.ironic('node-delete', params=node_id)
node_list_uuid = self.get_nodes_uuids_from_node_list()
if node_id in node_list_uuid:
self.fail('Ironic node {0} has not been deleted!'
.format(node_id))
def create_node(self, driver='fake', params=''):
node = self.ironic('node-create',
params='--driver {0} {1}'.format(driver, params))
if not node:
self.fail('Ironic node has not been created!')
node = utils.get_dict_from_output(node)
self.addCleanup(self.delete_node, node['uuid'])
return node
def show_node(self, node_id, params=''):
node_show = self.ironic('node-show',
params='{0} {1}'.format(node_id, params))
return utils.get_dict_from_output(node_show)
def list_nodes(self, params=''):
return self.ironic('node-list', params=params)
def update_node(self, node_id, params):
updated_node = self.ironic('node-update',
params='{0} {1}'.format(node_id, params))
return utils.get_dict_from_output(updated_node)
def get_nodes_uuids_from_node_list(self):
node_list = self.list_nodes()
return [x['UUID'] for x in node_list]
def show_node_states(self, node_id):
show_node_states = self.ironic('node-show-states', params=node_id)
return utils.get_dict_from_output(show_node_states)
def set_node_maintenance(self, node_id, maintenance_mode, params=''):
self.ironic(
'node-set-maintenance',
params='{0} {1} {2}'.format(node_id, maintenance_mode, params))
def set_node_power_state(self, node_id, power_state, params=''):
self.ironic('node-set-power-state',
params='{0} {1} {2}'
.format(node_id, power_state, params))
def set_node_provision_state(self, node_id, provision_state, params=''):
self.ironic('node-set-provision-state',
params='{0} {1} {2}'
.format(node_id, provision_state, params))
def validate_node(self, node_id):
return self.ironic('node-validate', params=node_id)
def list_node_chassis(self, chassis_uuid, params=''):
return self.ironic('chassis-node-list',
params='{0} {1}'.format(chassis_uuid, params))
def get_nodes_uuids_from_chassis_node_list(self, chassis_uuid):
chassis_node_list = self.list_node_chassis(chassis_uuid)
return [x['UUID'] for x in chassis_node_list]
def list_driver(self, params=''):
return self.ironic('driver-list', params=params)
def show_driver(self, driver_name):
driver_show = self.ironic('driver-show', params=driver_name)
return utils.get_dict_from_output(driver_show)
def properties_driver(self, driver_name):
return self.ironic('driver-properties', params=driver_name)
def get_drivers_names(self):
driver_list = self.list_driver()
return [x['Supported driver(s)'] for x in driver_list]
def delete_chassis(self, chassis_id, ignore_exceptions=False):
try:
self.ironic('chassis-delete', params=chassis_id)
except exceptions.CommandFailed:
if not ignore_exceptions:
raise
def get_chassis_uuids_from_chassis_list(self):
chassis_list = self.list_chassis()
return [x['UUID'] for x in chassis_list]
def create_chassis(self, params=''):
chassis = self.ironic('chassis-create', params=params)
if not chassis:
self.fail('Ironic chassis has not been created!')
chassis = utils.get_dict_from_output(chassis)
self.addCleanup(self.delete_chassis,
chassis['uuid'],
ignore_exceptions=True)
return chassis
def list_chassis(self, params=''):
return self.ironic('chassis-list', params=params)
def show_chassis(self, chassis_id, params=''):
chassis_show = self.ironic('chassis-show',
params='{0} {1}'.format(chassis_id, params))
return utils.get_dict_from_output(chassis_show)
def update_chassis(self, chassis_id, operation, params=''):
updated_chassis = self.ironic(
'chassis-update',
params='{0} {1} {2}'.format(chassis_id, operation, params))
return utils.get_dict_from_output(updated_chassis)
def delete_port(self, port_id, ignore_exceptions=False):
try:
self.ironic('port-delete', params=port_id)
except exceptions.CommandFailed:
if not ignore_exceptions:
raise
def create_port(self,
node_id,
mac_address=None,
flags='',
params=''):
if mac_address is None:
mac_address = data_utils.rand_mac_address()
port = self.ironic('port-create',
flags=flags,
params='--address {0} --node {1} {2}'
.format(mac_address, node_id, params))
if not port:
self.fail('Ironic port has not been created!')
return utils.get_dict_from_output(port)
def list_ports(self, params=''):
return self.ironic('port-list', params=params)
def show_port(self, port_id, params=''):
port_show = self.ironic('port-show', params='{0} {1}'
.format(port_id, params))
return utils.get_dict_from_output(port_show)
def get_uuids_from_port_list(self):
port_list = self.list_ports()
return [x['UUID'] for x in port_list]
def update_port(self, port_id, operation, flags='', params=''):
updated_port = self.ironic('port-update',
flags=flags,
params='{0} {1} {2}'
.format(port_id, operation, params))
return utils.get_dict_from_output(updated_port)
def create_portgroup(self, node_id, params=''):
"""Create a new portgroup."""
portgroup = self.ironic('portgroup-create',
flags=self.pg_api_ver,
params='--node {0} {1}'
.format(node_id, params))
if not portgroup:
self.fail('Ironic portgroup failed to create!')
portgroup = utils.get_dict_from_output(portgroup)
self.addCleanup(self.delete_portgroup, portgroup['uuid'],
ignore_exceptions=True)
return portgroup
def delete_portgroup(self, portgroup_id, ignore_exceptions=False):
"""Delete a port group."""
try:
self.ironic('portgroup-delete',
flags=self.pg_api_ver,
params=portgroup_id)
except exceptions.CommandFailed:
if not ignore_exceptions:
raise
def list_portgroups(self, params=''):
"""List the port groups."""
return self.ironic('portgroup-list',
flags=self.pg_api_ver,
params=params)
def show_portgroup(self, portgroup_id, params=''):
"""Show detailed information about a port group."""
portgroup_show = self.ironic('portgroup-show',
flags=self.pg_api_ver,
params='{0} {1}'
.format(portgroup_id, params))
return utils.get_dict_from_output(portgroup_show)
def update_portgroup(self, portgroup_id, op, params=''):
"""Update information about a port group."""
updated_portgroup = self.ironic('portgroup-update',
flags=self.pg_api_ver,
params='{0} {1} {2}'
.format(portgroup_id, op, params))
return utils.get_dict_from_output(updated_portgroup)
def get_portgroup_uuids_from_portgroup_list(self):
"""Get UUIDs from list of port groups."""
portgroup_list = self.list_portgroups()
return [x['UUID'] for x in portgroup_list]
def portgroup_port_list(self, portgroup_id, params=''):
"""List the ports associated with a port group."""
return self.ironic('portgroup-port-list', flags=self.pg_api_ver,
params='{0} {1}'.format(portgroup_id, params))