Merge "Add iBoot driver"

This commit is contained in:
Jenkins 2016-08-12 15:15:18 +00:00 committed by Gerrit Code Review
commit 5ed5ff4d99
12 changed files with 953 additions and 0 deletions

View File

@ -9,3 +9,4 @@ Available drivers
drivers/wol
drivers/amt
drivers/iboot

View File

@ -0,0 +1,76 @@
.. _IBOOT:
============
iBoot driver
============
Overview
========
The iBoot power driver enables you to take advantage of power cycle
management of nodes using Dataprobe iBoot devices over the DxP protocol.
Drivers
=======
There are two iboot drivers:
* The ``pxe_iboot_iscsi`` driver uses iBoot to control the power state of the
node, PXE/iPXE technology for booting and the iSCSI methodology for
deploying the node.
* The ``pxe_iboot_agent`` driver uses iBoot to control the power state of the
node, PXE/iPXE technology for booting and the Ironic Python Agent for
deploying an image to the node.
Requirements
~~~~~~~~~~~~
* ``python-iboot`` library should be installed - https://github.com/darkip/python-iboot
Tested platforms
~~~~~~~~~~~~~~~~
* iBoot-G2
Configuring and enabling the driver
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1. Add ``pxe_iboot_iscsi`` and/or ``pxe_iboot_agent`` to the list of
``enabled_drivers`` in */etc/ironic/ironic.conf*. For example::
[DEFAULT]
...
enabled_drivers = pxe_iboot_iscsi,pxe_iboot_agent
2. Restart the Ironic conductor service::
service ironic-conductor restart
Registering a node with the iBoot driver
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Nodes configured for the iBoot driver should have the ``driver`` property
set to ``pxe_iboot_iscsi`` or ``pxe_iboot_agent``.
The following configuration values are also required in ``driver_info``:
- ``iboot_address``: The IP address of the iBoot PDU.
- ``iboot_username``: User name used for authentication.
- ``iboot_password``: Password used for authentication.
In addition, there are optional properties in ``driver_info``:
- ``iboot_port``: iBoot PDU port. Defaults to 9100.
- ``iboot_relay_id``: iBoot PDU relay ID. This option is useful in order
to support multiple nodes attached to a single PDU. Defaults to 1.
The following sequence of commands can be used to enroll a node with
the iBoot driver.
1. Create node::
ironic node-create -d pxe_iboot_iscsi -i iboot_username=<username> -i iboot_password=<password> -i iboot_address=<address>
References
==========
.. [1] iBoot-G2 official documentation - http://dataprobe.com/support_iboot-g2.html

View File

@ -0,0 +1,81 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ironic.common import exception as ironic_exception
from ironic.common.i18n import _
from ironic.drivers import base
from ironic.drivers.modules import agent
from ironic.drivers.modules import fake
from ironic.drivers.modules import iscsi_deploy
from ironic.drivers.modules import pxe
from oslo_utils import importutils
from ironic_staging_drivers.iboot import power as iboot_power
class FakeIBootFakeDriver(base.BaseDriver):
"""Fake iBoot driver."""
def __init__(self):
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.boot = fake.FakeBoot()
self.power = iboot_power.IBootPower()
self.deploy = fake.FakeDeploy()
class PXEIBootISCSIDriver(base.BaseDriver):
"""PXE + IBoot PDU driver + iSCSI driver.
This driver implements the `core` functionality, combining
:class:`ironic.drivers.modules.pxe.PXEBoot` for boot and
:class:`ironic_staging_drivers.iboot.power.IBootPower` for power
and :class:`ironic.drivers.modules.iscsi_deploy.ISCSIDeploy` for
image deployment. Implementations are in those respective classes;
this class is merely the glue between them.
"""
def __init__(self):
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.power = iboot_power.IBootPower()
self.boot = pxe.PXEBoot()
self.deploy = iscsi_deploy.ISCSIDeploy()
self.vendor = iscsi_deploy.VendorPassthru()
class PXEIBootAgentDriver(base.BaseDriver):
"""PXE + IBoot PDU driver + Agent driver.
This driver implements the `core` functionality, combining
:class:`ironic.drivers.modules.pxe.PXEBoot` for boot and
:class:`ironic_staging_drivers.iboot.power.IBootPower` for power
and :class:'ironic.driver.modules.agent.AgentDeploy' for image
deployment. Implementations are in those respective classes;
this class is merely the glue between them.
"""
def __init__(self):
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.power = iboot_power.IBootPower()
self.boot = pxe.PXEBoot()
self.deploy = agent.AgentDeploy()
self.vendor = agent.AgentVendorInterface()

View File

@ -0,0 +1,287 @@
# Copyright 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Ironic iBoot PDU power manager.
"""
import time
from ironic.common import exception as ironic_exception
from ironic.common import states
from ironic.conductor import task_manager
from ironic.drivers import base
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import importutils
import six
from ironic_staging_drivers.common.i18n import _
from ironic_staging_drivers.common.i18n import _LW
from ironic_staging_drivers.common import utils
iboot = importutils.try_import('iboot')
LOG = logging.getLogger(__name__)
opts = [
cfg.IntOpt('max_retry',
default=3,
min=0,
help=_('Maximum retries for iBoot operations')),
cfg.IntOpt('retry_interval',
default=1,
min=0,
help=_('Time (in seconds) between retry attempts for iBoot '
'operations')),
cfg.IntOpt('reboot_delay',
default=5,
min=0,
help=_('Time (in seconds) to sleep between when rebooting '
'(powering off and on again).'))
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='iboot',
title='Options for the iBoot power driver')
CONF.register_group(opt_group)
try:
CONF.register_opts(opts, opt_group)
except cfg.DuplicateOptError:
LOG.warning('The iboot configuration options have been registered '
'already. Is the iboot driver included in both ironic '
'and ironic staging drivers?')
REQUIRED_PROPERTIES = {
'iboot_address': _("IP address of the PDU. Required."),
'iboot_username': _("username. Required."),
'iboot_password': _("password. Required."),
}
OPTIONAL_PROPERTIES = {
'iboot_relay_id': _("iBoot PDU relay id; default is 1. Optional."),
'iboot_port': _("iBoot PDU port; default is 9100. Optional."),
}
COMMON_PROPERTIES = REQUIRED_PROPERTIES.copy()
COMMON_PROPERTIES.update(OPTIONAL_PROPERTIES)
def _parse_driver_info(node):
info = node.driver_info or {}
missing_info = [key for key in REQUIRED_PROPERTIES if not info.get(key)]
if missing_info:
raise ironic_exception.MissingParameterValue(
_("Missing the following iBoot credentials in node's"
" driver_info: %s.") % missing_info)
address = info['iboot_address']
username = info['iboot_username']
password = info['iboot_password']
relay_id = info.get('iboot_relay_id', 1)
try:
relay_id = int(relay_id)
except ValueError:
raise ironic_exception.InvalidParameterValue(
_("iBoot PDU relay id must be an integer."))
port = info.get('iboot_port', 9100)
port = utils.validate_network_port(port, 'iboot_port')
return {
'address': address,
'username': username,
'password': password,
'port': port,
'relay_id': relay_id,
'uuid': node.uuid,
}
def _get_connection(driver_info):
# NOTE: python-iboot wants username and password as strings (not unicode)
return iboot.iBootInterface(driver_info['address'],
six.binary_type(driver_info['username']),
six.binary_type(driver_info['password']),
port=driver_info['port'],
num_relays=driver_info['relay_id'])
def _switch(driver_info, enabled):
conn = _get_connection(driver_info)
relay_id = driver_info['relay_id']
def _wait_for_switch(mutable):
if mutable['retries'] > CONF.iboot.max_retry:
LOG.warning(_LW(
'Reached maximum number of attempts (%(attempts)d) to set '
'power state for node %(node)s to "%(op)s"'),
{'attempts': mutable['retries'], 'node': driver_info['uuid'],
'op': states.POWER_ON if enabled else states.POWER_OFF})
raise loopingcall.LoopingCallDone()
try:
mutable['retries'] += 1
mutable['response'] = conn.switch(relay_id, enabled)
if mutable['response']:
raise loopingcall.LoopingCallDone()
except (TypeError, IndexError):
LOG.warning(_LW("Cannot call set power state for node '%(node)s' "
"at relay '%(relay)s'. iBoot switch() failed."),
{'node': driver_info['uuid'], 'relay': relay_id})
mutable = {'response': False, 'retries': 0}
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_switch,
mutable)
timer.start(interval=CONF.iboot.retry_interval).wait()
return mutable['response']
def _sleep_switch(seconds):
"""Function broken out for testing purpose."""
time.sleep(seconds)
def _check_power_state(driver_info, pstate):
"""Function to check power state is correct. Up to max retries."""
# always try once + number of retries
for num in range(0, 1 + CONF.iboot.max_retry):
state = _power_status(driver_info)
if state == pstate:
return
if num < CONF.iboot.max_retry:
time.sleep(CONF.iboot.retry_interval)
raise ironic_exception.PowerStateFailure(pstate=pstate)
def _power_status(driver_info):
conn = _get_connection(driver_info)
relay_id = driver_info['relay_id']
def _wait_for_power_status(mutable):
if mutable['retries'] > CONF.iboot.max_retry:
LOG.warning(_LW(
'Reached maximum number of attempts (%(attempts)d) to get '
'power state for node %(node)s'),
{'attempts': mutable['retries'], 'node': driver_info['uuid']})
raise loopingcall.LoopingCallDone()
try:
mutable['retries'] += 1
response = conn.get_relays()
status = response[relay_id - 1]
if status:
mutable['state'] = states.POWER_ON
else:
mutable['state'] = states.POWER_OFF
raise loopingcall.LoopingCallDone()
except (TypeError, IndexError):
LOG.warning(_LW("Cannot get power state for node '%(node)s' at "
"relay '%(relay)s'. iBoot get_relays() failed."),
{'node': driver_info['uuid'], 'relay': relay_id})
mutable = {'state': states.ERROR, 'retries': 0}
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_power_status,
mutable)
timer.start(interval=CONF.iboot.retry_interval).wait()
return mutable['state']
class IBootPower(base.PowerInterface):
"""iBoot PDU Power Driver for Ironic
This PowerManager class provides a mechanism for controlling power state
via an iBoot capable device.
Requires installation of python-iboot:
https://github.com/darkip/python-iboot
"""
def get_properties(self):
return COMMON_PROPERTIES
def validate(self, task):
"""Validate driver_info for iboot driver.
:param task: a TaskManager instance containing the node to act on.
:raises: InvalidParameterValue if iboot parameters are invalid.
:raises: MissingParameterValue if required iboot parameters are
missing.
"""
_parse_driver_info(task.node)
def get_power_state(self, task):
"""Get the current power state of the task's node.
:param task: a TaskManager instance containing the node to act on.
:returns: one of ironic.common.states POWER_OFF, POWER_ON or ERROR.
:raises: InvalidParameterValue if iboot parameters are invalid.
:raises: MissingParameterValue if required iboot parameters are
missing.
"""
driver_info = _parse_driver_info(task.node)
return _power_status(driver_info)
@task_manager.require_exclusive_lock
def set_power_state(self, task, pstate):
"""Turn the power on or off.
:param task: a TaskManager instance containing the node to act on.
:param pstate: The desired power state, one of ironic.common.states
POWER_ON, POWER_OFF.
:raises: InvalidParameterValue if iboot parameters are invalid or if
an invalid power state was specified.
:raises: MissingParameterValue if required iboot parameters are
missing.
:raises: PowerStateFailure if the power couldn't be set to pstate.
"""
driver_info = _parse_driver_info(task.node)
if pstate == states.POWER_ON:
_switch(driver_info, True)
elif pstate == states.POWER_OFF:
_switch(driver_info, False)
else:
raise ironic_exception.InvalidParameterValue(
_("set_power_state called with invalid "
"power state %s.") % pstate)
_check_power_state(driver_info, pstate)
@task_manager.require_exclusive_lock
def reboot(self, task):
"""Cycles the power to the task's node.
:param task: a TaskManager instance containing the node to act on.
:raises: InvalidParameterValue if iboot parameters are invalid.
:raises: MissingParameterValue if required iboot parameters are
missing.
:raises: PowerStateFailure if the final state of the node is not
POWER_ON.
"""
driver_info = _parse_driver_info(task.node)
_switch(driver_info, False)
_sleep_switch(CONF.iboot.reboot_delay)
_switch(driver_info, True)
_check_power_state(driver_info, states.POWER_ON)

View File

@ -0,0 +1,16 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ironic_staging_drivers.tests.unit import third_party_driver_mocks # noqa

View File

@ -0,0 +1,417 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test class for iBoot PDU driver module."""
import types
import mock
from ironic.common import driver_factory
from ironic.common import exception as ironic_exception
from ironic.common import states
from ironic.conductor import task_manager
from ironic.tests.unit.conductor import mgr_utils
from ironic.tests.unit.db import base as db_base
from ironic.tests.unit.objects import utils as obj_utils
from ironic_staging_drivers.iboot import power as iboot_power
INFO_DICT = {
'iboot_address': '1.2.3.4',
'iboot_username': 'admin',
'iboot_password': 'fake',
}
class IBootPrivateMethodTestCase(db_base.DbTestCase):
def setUp(self):
super(IBootPrivateMethodTestCase, self).setUp()
self.config(max_retry=0, group='iboot')
self.config(retry_interval=0, group='iboot')
def test__parse_driver_info_good(self):
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
self.assertIsNotNone(info.get('address'))
self.assertIsNotNone(info.get('username'))
self.assertIsNotNone(info.get('password'))
self.assertIsNotNone(info.get('port'))
self.assertIsNotNone(info.get('relay_id'))
def test__parse_driver_info_good_with_explicit_port(self):
info = dict(INFO_DICT)
info['iboot_port'] = '1234'
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
info = iboot_power._parse_driver_info(node)
self.assertEqual(1234, info.get('port'))
def test__parse_driver_info_good_with_explicit_relay_id(self):
info = dict(INFO_DICT)
info['iboot_relay_id'] = '2'
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
info = iboot_power._parse_driver_info(node)
self.assertEqual(2, info.get('relay_id'))
def test__parse_driver_info_missing_address(self):
info = dict(INFO_DICT)
del info['iboot_address']
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
self.assertRaises(ironic_exception.MissingParameterValue,
iboot_power._parse_driver_info,
node)
def test__parse_driver_info_missing_username(self):
info = dict(INFO_DICT)
del info['iboot_username']
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
self.assertRaises(ironic_exception.MissingParameterValue,
iboot_power._parse_driver_info,
node)
def test__parse_driver_info_missing_password(self):
info = dict(INFO_DICT)
del info['iboot_password']
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
self.assertRaises(ironic_exception.MissingParameterValue,
iboot_power._parse_driver_info,
node)
def test__parse_driver_info_bad_port(self):
info = dict(INFO_DICT)
info['iboot_port'] = 'not-integer'
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
self.assertRaises(ironic_exception.InvalidParameterValue,
iboot_power._parse_driver_info,
node)
def test__parse_driver_info_bad_relay_id(self):
info = dict(INFO_DICT)
info['iboot_relay_id'] = 'not-integer'
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=info)
self.assertRaises(ironic_exception.InvalidParameterValue,
iboot_power._parse_driver_info,
node)
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_on(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
mock_connection.get_relays.return_value = [True]
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.POWER_ON, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_off(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
mock_connection.get_relays.return_value = [False]
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.POWER_OFF, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_ironic_exception(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
mock_connection.get_relays.return_value = None
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_ironic_exception_type_error(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
side_effect = TypeError("Surprise!")
mock_connection.get_relays.side_effect = side_effect
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_ironic_exception_index_error(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
side_effect = IndexError("Gotcha!")
mock_connection.get_relays.side_effect = side_effect
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_error(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
mock_connection.get_relays.return_value = list()
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__power_status_retries(self, mock_get_conn):
self.config(max_retry=1, group='iboot')
mock_connection = mock.MagicMock(spec_set=['get_relays'])
side_effect = TypeError("Surprise!")
mock_connection.get_relays.side_effect = side_effect
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
self.assertEqual(2, mock_connection.get_relays.call_count)
class IBootDriverTestCase(db_base.DbTestCase):
def setUp(self):
super(IBootDriverTestCase, self).setUp()
self.config(max_retry=0, group='iboot')
self.config(retry_interval=0, group='iboot')
self.config(reboot_delay=0, group='iboot')
mgr_utils.mock_the_extension_manager(driver='fake_iboot_fake')
self.driver = driver_factory.get_driver('fake_iboot_fake')
self.node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
self.info = iboot_power._parse_driver_info(self.node)
def test_get_properties(self):
expected = iboot_power.COMMON_PROPERTIES
with task_manager.acquire(
self.context, self.node.uuid, shared=True) as task:
self.assertEqual(expected, task.driver.get_properties())
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', autospec=True)
def test_set_power_state_good(self, mock_switch, mock_power_status):
mock_power_status.return_value = states.POWER_ON
with task_manager.acquire(self.context, self.node.uuid) as task:
task.driver.power.set_power_state(task, states.POWER_ON)
# ensure functions were called with the valid parameters
mock_switch.assert_called_once_with(self.info, True)
mock_power_status.assert_called_once_with(self.info)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', autospec=True)
def test_set_power_state_bad(self, mock_switch, mock_power_status):
mock_power_status.return_value = states.POWER_OFF
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(ironic_exception.PowerStateFailure,
task.driver.power.set_power_state,
task, states.POWER_ON)
# ensure functions were called with the valid parameters
mock_switch.assert_called_once_with(self.info, True)
mock_power_status.assert_called_once_with(self.info)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', autospec=True)
def test_set_power_state_retry(self, mock_switch, mock_power_status):
self.config(max_retry=2, group='iboot')
mock_power_status.return_value = states.POWER_OFF
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(ironic_exception.PowerStateFailure,
task.driver.power.set_power_state,
task, states.POWER_ON)
# ensure functions were called with the valid parameters
mock_switch.assert_called_once_with(self.info, True)
# 1 + 2 retries
self.assertEqual(3, mock_power_status.call_count)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', autospec=True)
def test_set_power_state_invalid_parameter(self, mock_switch,
mock_power_status):
mock_power_status.return_value = states.POWER_ON
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(ironic_exception.InvalidParameterValue,
task.driver.power.set_power_state,
task, states.NOSTATE)
@mock.patch.object(iboot_power, '_sleep_switch',
spec_set=types.FunctionType)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', spec_set=types.FunctionType)
def test_reboot_good(self, mock_switch, mock_power_status,
mock_sleep_switch):
self.config(reboot_delay=3, group='iboot')
manager = mock.MagicMock(spec_set=['switch', 'sleep'])
mock_power_status.return_value = states.POWER_ON
manager.attach_mock(mock_switch, 'switch')
manager.attach_mock(mock_sleep_switch, 'sleep')
expected = [mock.call.switch(self.info, False),
mock.call.sleep(3),
mock.call.switch(self.info, True)]
with task_manager.acquire(self.context, self.node.uuid) as task:
task.driver.power.reboot(task)
self.assertEqual(expected, manager.mock_calls)
@mock.patch.object(iboot_power, '_sleep_switch',
spec_set=types.FunctionType)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_switch', spec_set=types.FunctionType)
def test_reboot_bad(self, mock_switch, mock_power_status,
mock_sleep_switch):
self.config(reboot_delay=3, group='iboot')
manager = mock.MagicMock(spec_set=['switch', 'sleep'])
mock_power_status.return_value = states.POWER_OFF
manager.attach_mock(mock_switch, 'switch')
manager.attach_mock(mock_sleep_switch, 'sleep')
expected = [mock.call.switch(self.info, False),
mock.call.sleep(3),
mock.call.switch(self.info, True)]
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(ironic_exception.PowerStateFailure,
task.driver.power.reboot, task)
self.assertEqual(expected, manager.mock_calls)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
@mock.patch.object(iboot_power, '_get_connection', autospec=True)
def test__switch_retries(self, mock_get_conn, mock_power_status):
self.config(max_retry=1, group='iboot')
mock_power_status.return_value = states.POWER_ON
mock_connection = mock.MagicMock(spec_set=['switch'])
side_effect = TypeError("Surprise!")
mock_connection.switch.side_effect = side_effect
mock_get_conn.return_value = mock_connection
iboot_power._switch(self.info, False)
self.assertEqual(2, mock_connection.switch.call_count)
@mock.patch.object(iboot_power, '_power_status', autospec=True)
def test_get_power_state(self, mock_power_status):
mock_power_status.return_value = states.POWER_ON
with task_manager.acquire(self.context, self.node.uuid) as task:
state = task.driver.power.get_power_state(task)
self.assertEqual(state, states.POWER_ON)
# ensure functions were called with the valid parameters
mock_power_status.assert_called_once_with(self.info)
@mock.patch.object(iboot_power, '_parse_driver_info', autospec=True)
def test_validate_good(self, parse_drv_info_mock):
with task_manager.acquire(self.context, self.node.uuid,
shared=True) as task:
task.driver.power.validate(task)
self.assertEqual(1, parse_drv_info_mock.call_count)
@mock.patch.object(iboot_power, '_parse_driver_info', autospec=True)
def test_validate_fails(self, parse_drv_info_mock):
side_effect = ironic_exception.InvalidParameterValue("Bad input")
parse_drv_info_mock.side_effect = side_effect
with task_manager.acquire(self.context, self.node.uuid,
shared=True) as task:
self.assertRaises(ironic_exception.InvalidParameterValue,
task.driver.power.validate, task)
self.assertEqual(1, parse_drv_info_mock.call_count)

View File

@ -0,0 +1,22 @@
# Copyright 2015 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""This module provides mock 'specs' for third party modules that can be used
when needing to mock those third party modules"""
# iboot
IBOOT_SPEC = (
'iBootInterface',
)

View File

@ -0,0 +1,49 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""This module detects whether third-party libraries, utilized by third-party
drivers, are present on the system. If they are not, it mocks them and tinkers
with sys.modules so that the drivers can be loaded by unit tests, and the unit
tests can continue to test the functionality of those drivers without the
respective external libraries' actually being present.
Any external library required by a third-party driver should be mocked here.
Current list of mocked libraries:
- iboot
"""
import sys
import mock
from oslo_utils import importutils
import six
from ironic_staging_drivers.tests.unit import third_party_driver_mock_specs \
as mock_specs
# attempt to load the external 'iboot' library, which is required by
# the optional drivers.modules.iboot module
iboot = importutils.try_import('iboot')
if not iboot:
ib = mock.MagicMock(spec_set=mock_specs.IBOOT_SPEC)
ib.iBootInterface = mock.MagicMock(spec_set=[])
sys.modules['iboot'] = ib
# if anything has loaded the iboot driver yet, reload it now that the
# external library has been mocked
if 'ironic.drivers.modules.iboot' in sys.modules:
six.moves.reload_module(sys.modules['ironic.drivers.modules.iboot'])

View File

@ -11,3 +11,4 @@ oslo.log>=1.14.0 # Apache-2.0
oslo.utils>=3.5.0 # Apache-2.0
six>=1.9.0 # MIT
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
oslo.service>=1.10.0 # Apache-2.0

View File

@ -35,6 +35,9 @@ ironic.drivers =
pxe_libvirt_agent = ironic_staging_drivers.libvirt:PXELibvirtAgentDriver
pxe_libvirt_iscsi = ironic_staging_drivers.libvirt:PXELibvirtISCSIDriver
fake_libvirt_fake = ironic_staging_drivers.libvirt:FakeLibvirtFakeDriver
fake_iboot_fake = ironic_staging_drivers.iboot:FakeIBootFakeDriver
pxe_iboot_iscsi = ironic_staging_drivers.iboot:PXEIBootISCSIDriver
pxe_iboot_agent = ironic_staging_drivers.iboot:PXEIBootAgentDriver
[build_sphinx]
source-dir = doc/source