Update IPMI driver to work with resource manager.

Update the IPMI driver so that Ironic's resource manager can maintain a
single instance of it, and pass (task, node) to IPMI methods.  This
brings the IPMI class in line with the base ControlDriver class
definition.

Add a missing exception to common/exceptions.py.

IPMI driver methods presently support being called for a single node at
a time.  Support for multiple nodes passed down by the TaskManager is
not part of this patch.

Remove terminal support. To be added back later.

Change-Id: Ibb9fc84f7b46d848b0c1ac761a3be952c99f7509
This commit is contained in:
Devananda van der Veen 2013-05-24 09:59:18 -07:00
parent 438b77e006
commit 378dfac26b
3 changed files with 333 additions and 320 deletions

View File

@ -194,6 +194,12 @@ class InvalidMAC(Invalid):
message = _("Expected a MAC address but received %(mac)s.")
# Cannot be templated as the error syntax varies.
# msg needs to be constructed when raised.
class InvalidParameterValue(Invalid):
message = _("%(err)s")
class NotFound(IronicException):
message = _("Resource could not be found.")
code = 404
@ -246,3 +252,7 @@ class PowerStateFailure(IronicException):
class ExclusiveLockRequired(NotAuthorized):
message = _("An exclusive lock is required, "
"but the current context has a shared lock.")
class IPMIFailure(IronicException):
message = _("IPMI command failed: %(cmd)s.")

View File

@ -56,7 +56,10 @@ CONF.register_opts(opts)
LOG = logging.getLogger(__name__)
VALID_BOOT_DEVICES = ['pxe', 'disk', 'safe', 'cdrom', 'bios']
# TODO(deva): use a contextmanager for this, and port it to nova.
def _make_password_file(password):
fd, path = tempfile.mkstemp()
os.fchmod(fd, stat.S_IRUSR | stat.S_IWUSR)
@ -65,22 +68,117 @@ def _make_password_file(password):
return path
def _get_console_pid_path(node_id):
name = "%s.pid" % node_id
path = os.path.join(CONF.terminal_pid_dir, name)
return path
def _parse_control_info(node):
info = json.loads(node.get('control_info', ''))
address = info.get('ipmi_address', None)
user = info.get('ipmi_username', None)
password = info.get('ipmi_password', None)
port = info.get('ipmi_terminal_port', None)
if not address or not user or not password:
raise exception.InvalidParameterValue(_(
"IPMI credentials not supplied to IPMI driver."))
return {
'address': address,
'user': user,
'password': password,
'port': port,
'uuid': node.get('uuid')
}
def _get_console_pid(node_id):
pid_path = _get_console_pid_path(node_id)
if os.path.exists(pid_path):
with open(pid_path, 'r') as f:
pid_str = f.read()
def _exec_ipmitool(c_info, command):
args = ['ipmitool',
'-I',
'lanplus',
'-H',
c_info['address'],
'-U',
c_info['user'],
'-f']
try:
pwfile = _make_password_file(c_info['password'])
args.append(pwfile)
args.extend(command.split(" "))
out, err = utils.execute(*args, attempts=3)
LOG.debug(_("ipmitool stdout: '%(out)s', stderr: '%(err)s'"),
locals())
return out, err
finally:
utils.delete_if_exists(pwfile)
def _power_on(c_info):
"""Turn the power to this node ON."""
# use mutable objects so the looped method can change them
state = [None]
retries = [0]
def _wait_for_power_on(state, retries):
"""Called at an interval until the node's power is on."""
state[0] = _power_status(c_info)
if state[0] == states.POWER_ON:
raise loopingcall.LoopingCallDone()
if retries[0] > CONF.ipmi_power_retry:
state[0] = states.ERROR
raise loopingcall.LoopingCallDone()
try:
return int(pid_str)
except ValueError:
LOG.warn(_("pid file %s does not contain any pid"), pid_path)
return None
retries[0] += 1
_exec_ipmitool(c_info, "power on")
except Exception:
# Log failures but keep trying
LOG.warning(_("IPMI power on failed for node %s.")
% c_info['uuid'])
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_power_on,
state, retries)
timer.start(interval=1).wait()
return state[0]
def _power_off(c_info):
"""Turn the power to this node OFF."""
# use mutable objects so the looped method can change them
state = [None]
retries = [0]
def _wait_for_power_off(state, retries):
"""Called at an interval until the node's power is off."""
state[0] = _power_status(c_info)
if state[0] == states.POWER_OFF:
raise loopingcall.LoopingCallDone()
if retries[0] > CONF.ipmi_power_retry:
state[0] = states.ERROR
raise loopingcall.LoopingCallDone()
try:
retries[0] += 1
_exec_ipmitool(c_info, "power off")
except Exception:
# Log failures but keep trying
LOG.warning(_("IPMI power off failed for node %s.")
% c_info['uuid'])
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_power_off,
state=state, retries=retries)
timer.start(interval=1).wait()
return state[0]
def _power_status(c_info):
out_err = _exec_ipmitool(c_info, "power status")
if out_err[0] == "Chassis Power is on\n":
return states.POWER_ON
elif out_err[0] == "Chassis Power is off\n":
return states.POWER_OFF
else:
return states.ERROR
class IPMIPowerDriver(base.ControlDriver):
@ -89,192 +187,81 @@ class IPMIPowerDriver(base.ControlDriver):
This ControlDriver class provides mechanism for controlling the power state
of physical hardware via IPMI calls. It also provides console access for
some supported hardware.
NOTE: This driver does not currently support multi-node operations.
"""
def __init__(self, node, **kwargs):
self.state = None
self.retries = None
self.node_id = node['id']
self.power_info = json.loads(node['control_info'])
self.address = self.power_info.get('ipmi_address', None)
self.user = self.power_info.get('ipmi_username', None)
self.password = self.power_info.get('ipmi_password', None)
self.port = self.power_info.get('terminal_port', None)
if self.node_id is None:
raise exception.InvalidParameterValue(_("Node id not supplied "
"to IPMI"))
if self.address is None:
raise exception.InvalidParameterValue(_("Address not supplied "
"to IPMI"))
if self.user is None:
raise exception.InvalidParameterValue(_("User not supplied "
"to IPMI"))
if self.password is None:
raise exception.InvalidParameterValue(_("Password not supplied "
"to IPMI"))
def _exec_ipmitool(self, command):
args = ['ipmitool',
'-I',
'lanplus',
'-H',
self.address,
'-U',
self.user,
'-f']
pwfile = _make_password_file(self.password)
try:
args.append(pwfile)
args.extend(command.split(" "))
out, err = utils.execute(*args, attempts=3)
LOG.debug(_("ipmitool stdout: '%(out)s', stderr: '%(err)s'"),
locals())
return out, err
finally:
utils.delete_if_exists(pwfile)
def _power_on(self):
"""Turn the power to this node ON."""
def _wait_for_power_on():
"""Called at an interval until the node's power is on."""
self._update_state()
if self.state == states.POWER_ON:
raise loopingcall.LoopingCallDone()
if self.retries > CONF.ipmi_power_retry:
self.state = states.ERROR
raise loopingcall.LoopingCallDone()
try:
self.retries += 1
self._exec_ipmitool("power on")
except Exception:
LOG.exception(_("IPMI power on failed"))
self.retries = 0
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_power_on)
timer.start(interval=1).wait()
def _power_off(self):
"""Turn the power to this node OFF."""
def _wait_for_power_off():
"""Called at an interval until the node's power is off."""
self._update_state()
if self.state == states.POWER_OFF:
raise loopingcall.LoopingCallDone()
if self.retries > CONF.ipmi_power_retry:
self.state = states.ERROR
raise loopingcall.LoopingCallDone()
try:
self.retries += 1
self._exec_ipmitool("power off")
except Exception:
LOG.exception(_("IPMI power off failed"))
self.retries = 0
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_power_off)
timer.start(interval=1).wait()
def _set_pxe_for_next_boot(self):
# FIXME: raise exception, not just log
# FIXME: make into a public set-boot function
try:
self._exec_ipmitool("chassis bootdev pxe")
except Exception:
LOG.exception(_("IPMI set next bootdev failed"))
def _update_state(self):
# FIXME: better error and other-state handling
out_err = self._exec_ipmitool("power status")
if out_err[0] == "Chassis Power is on\n":
self.state = states.POWER_ON
elif out_err[0] == "Chassis Power is off\n":
self.state = states.POWER_OFF
else:
self.state = states.ERROR
def validate_driver_info(node):
# Temporary stub so patch I8ba2f4bd70bd2d7af405868cca2aedb56d3f0640
# will pass. The next patch actually implements this.
def __init__(self):
pass
def get_power_state(self):
"""Checks and returns current power state."""
self._update_state()
return self.state
def validate_driver_info(self, node):
"""Check that node['control_info'] contains the requisite fields."""
try:
_parse_control_info(node)
except exception.InvalidParameterValue:
return False
return True
def set_power_state(self, pstate):
def get_power_state(self, task, node):
"""Get the current power state."""
c_info = _parse_control_info(node)
return _power_status(c_info)
def set_power_state(self, task, node, pstate):
"""Turn the power on or off."""
if self.state and self.state == pstate:
LOG.warning(_("set_power_state called with current state."))
c_info = _parse_control_info(node)
if pstate == states.POWER_ON:
self._set_pxe_for_next_boot()
self._power_on()
state = _power_on(c_info)
elif pstate == states.POWER_OFF:
self._power_off()
state = _power_off(c_info)
else:
LOG.error(_("set_power_state called with invalid pstate."))
raise exception.IronicException(_(
"set_power_state called with invalid power state."))
if self.state != pstate:
if state != pstate:
raise exception.PowerStateFailure(pstate=pstate)
def reboot(self):
def reboot(self, task, node):
"""Cycles the power to a node."""
self._power_off()
self._set_pxe_for_next_boot()
self._power_on()
c_info = _parse_control_info(node)
_power_off(c_info)
state = _power_on(c_info)
if self.state != states.POWER_ON:
if state != states.POWER_ON:
raise exception.PowerStateFailure(pstate=states.POWER_ON)
def start_console(self):
if not self.port:
return
args = []
args.append(CONF.terminal)
if CONF.terminal_cert_dir:
args.append("-c")
args.append(CONF.terminal_cert_dir)
else:
args.append("-t")
args.append("-p")
args.append(str(self.port))
args.append("--background=%s" % _get_console_pid_path(self.node_id))
args.append("-s")
def set_boot_device(self, task, node, device, persistent=False):
"""Set the boot device for a node.
:param task: TaskManager context.
:param node: The Node.
:param device: Boot device. One of [pxe, disk, cdrom, safe, bios].
:param persistent: Whether to set next-boot, or make the change
permanent. Default: False.
:raises: InvalidParameterValue if an invalid boot device is specified.
:raises: IPMIFailure on an error from ipmitool.
"""
if device not in VALID_BOOT_DEVICES:
raise exception.InvalidParameterValue(_(
"Invalid boot device %s specified.") % device)
cmd = "chassis bootdev %s" % device
if persistent:
cmd = cmd + " options=persistent"
c_info = _parse_control_info(node)
try:
pwfile = _make_password_file(self.password)
ipmi_args = "/:%(uid)s:%(gid)s:HOME:ipmitool -H %(address)s" \
" -I lanplus -U %(user)s -f %(pwfile)s sol activate" \
% {'uid': os.getuid(),
'gid': os.getgid(),
'address': self.address,
'user': self.user,
'pwfile': pwfile,
}
out, err = _exec_ipmitool(c_info, cmd)
# TODO(deva): validate (out, err) and add unit test for failure
except Exception:
raise exception.IPMIFailure(cmd=cmd)
args.append(ipmi_args)
# Run shellinaboxd without pipes. Otherwise utils.execute() waits
# infinitely since shellinaboxd does not close passed fds.
x = ["'" + arg.replace("'", "'\\''") + "'" for arg in args]
x.append('</dev/null')
x.append('>/dev/null')
x.append('2>&1')
utils.execute(' '.join(x), shell=True)
finally:
utils.delete_if_exists(pwfile)
# TODO(deva): port start_console
def start_console(self, task, node):
raise exception.IronicException(_(
"start_console is not supported by IPMIPowerDriver."))
def stop_console(self):
console_pid = _get_console_pid(self.node_id)
if console_pid:
# Allow exitcode 99 (RC_UNAUTHORIZED)
utils.execute('kill', '-TERM', str(console_pid),
run_as_root=True,
check_exit_code=[0, 99])
utils.delete_if_exists(_get_console_pid_path(self.node_id))
# TODO(deva): port stop_console
def stop_console(self, task, node):
raise exception.IronicException(_(
"stop_console is not supported by IPMIPowerDriver."))

View File

@ -21,10 +21,12 @@
import os
import stat
import tempfile
from oslo.config import cfg
from ironic.openstack.common import jsonutils as json
from ironic.common import exception
from ironic.common import states
from ironic.common import utils
from ironic.drivers import ipmi
@ -39,15 +41,9 @@ class BareMetalIPMITestCase(base.TestCase):
def setUp(self):
super(BareMetalIPMITestCase, self).setUp()
self.node = db_utils.get_test_node()
self.ipmi = ipmi.IPMIPowerDriver(self.node)
self.ipmi = ipmi.IPMIPowerDriver()
def test_construct(self):
self.assertEqual(self.ipmi.node_id, 123)
self.assertEqual(self.ipmi.address, '1.2.3.4')
self.assertEqual(self.ipmi.user, 'admin')
self.assertEqual(self.ipmi.password, 'fake')
def test_make_password_file(self):
def test__make_password_file(self):
fakepass = 'this is a fake password'
pw_file = ipmi._make_password_file(fakepass)
try:
@ -59,18 +55,39 @@ class BareMetalIPMITestCase(base.TestCase):
finally:
os.unlink(pw_file)
def test_exec_ipmitool(self):
def test__parse_control_info(self):
# make sure we get back the expected things
node = db_utils.get_test_node()
info = ipmi._parse_control_info(node)
self.assertIsNotNone(info.get('address'))
self.assertIsNotNone(info.get('user'))
self.assertIsNotNone(info.get('password'))
self.assertIsNotNone(info.get('uuid'))
# make sure error is raised when info, eg. username, is missing
_control_info = json.dumps(
{
"ipmi_address": "1.2.3.4",
"ipmi_password": "fake",
})
node = db_utils.get_test_node(control_info=_control_info)
self.assertRaises(exception.InvalidParameterValue,
ipmi._parse_control_info,
node)
def test__exec_ipmitool(self):
pw_file = '/tmp/password_file'
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_make_password_file')
self.mox.StubOutWithMock(utils, 'execute')
self.mox.StubOutWithMock(utils, 'delete_if_exists')
ipmi._make_password_file(self.ipmi.password).AndReturn(pw_file)
ipmi._make_password_file(info['password']).AndReturn(pw_file)
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.ipmi.address,
'-U', self.ipmi.user,
'-H', info['address'],
'-U', info['user'],
'-f', pw_file,
'A', 'B', 'C',
]
@ -78,176 +95,175 @@ class BareMetalIPMITestCase(base.TestCase):
utils.delete_if_exists(pw_file).AndReturn(None)
self.mox.ReplayAll()
self.ipmi._exec_ipmitool('A B C')
ipmi._exec_ipmitool(info, 'A B C')
self.mox.VerifyAll()
def test_update_state_on(self):
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is on\n"])
def test__power_status_on(self):
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is on\n", None])
self.mox.ReplayAll()
self.ipmi.state = states.NOSTATE
self.ipmi._update_state()
state = ipmi._power_status(info)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.POWER_ON)
self.assertEqual(state, states.POWER_ON)
def test_update_state_off(self):
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
def test__power_status_off(self):
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
self.mox.ReplayAll()
self.ipmi.state = states.NOSTATE
self.ipmi._update_state()
state = ipmi._power_status(info)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.POWER_OFF)
self.assertEqual(state, states.POWER_OFF)
def test_update_state_error(self):
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is foobar\n"])
def test__power_status_error(self):
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is badstate\n", None])
self.mox.ReplayAll()
self.ipmi.state = states.NOSTATE
self.ipmi._update_state()
state = ipmi._power_status(info)
self.mox.VerifyAll()
self.assertEqual(state, states.ERROR)
def test__power_on_max_retries(self):
self.flags(ipmi_power_retry=2)
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
ipmi._exec_ipmitool(info, "power on").AndReturn([None, None])
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
ipmi._exec_ipmitool(info, "power on").AndReturn([None, None])
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
ipmi._exec_ipmitool(info, "power on").AndReturn([None, None])
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
self.mox.ReplayAll()
state = ipmi._power_on(info)
self.mox.VerifyAll()
self.assertEqual(state, states.ERROR)
def test_get_power_state(self):
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is off\n", None])
ipmi._exec_ipmitool(info, "power status").AndReturn(
["Chassis Power is on\n", None])
ipmi._exec_ipmitool(info, "power status").AndReturn(
["\n", None])
self.mox.ReplayAll()
pstate = self.ipmi.get_power_state(None, self.node)
self.assertEqual(pstate, states.POWER_OFF)
pstate = self.ipmi.get_power_state(None, self.node)
self.assertEqual(pstate, states.POWER_ON)
pstate = self.ipmi.get_power_state(None, self.node)
self.assertEqual(pstate, states.ERROR)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.ERROR)
def test_set_power_on_ok(self):
self.flags(ipmi_power_retry=0)
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_power_on')
self.mox.StubOutWithMock(ipmi, '_power_off')
self.ipmi._exec_ipmitool("set bootdev pxe").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power on").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is on\n"])
ipmi._power_on(info).AndReturn(states.POWER_ON)
self.mox.ReplayAll()
self.ipmi.state = states.POWER_OFF
self.ipmi.set_power_state(states.POWER_ON)
self.ipmi.set_power_state(None, self.node, states.POWER_ON)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.POWER_ON)
def test_set_power_off_ok(self):
self.flags(ipmi_power_retry=0)
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_power_on')
self.mox.StubOutWithMock(ipmi, '_power_off')
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is on\n"])
self.ipmi._exec_ipmitool("power off").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
ipmi._power_off(info).AndReturn(states.POWER_OFF)
self.mox.ReplayAll()
self.ipmi.state = states.POWER_ON
self.ipmi._power_off()
self.ipmi.set_power_state(None, self.node, states.POWER_OFF)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.POWER_OFF)
def test_power_on_fail(self):
def test_set_power_on_fail(self):
self.flags(ipmi_power_retry=0)
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
info = ipmi._parse_control_info(self.node)
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power on").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.mox.StubOutWithMock(ipmi, '_power_on')
self.mox.StubOutWithMock(ipmi, '_power_off')
ipmi._power_on(info).AndReturn(states.ERROR)
self.mox.ReplayAll()
self.ipmi.state = states.POWER_OFF
self.ipmi._power_on()
self.assertRaises(exception.PowerStateFailure,
self.ipmi.set_power_state,
None,
self.node,
states.POWER_ON)
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.ERROR)
def test_power_on_max_retries(self):
self.flags(ipmi_power_retry=2)
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
def test_set_power_invalid_state(self):
self.assertRaises(exception.IronicException,
self.ipmi.set_power_state,
None,
self.node,
"fake state")
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power on").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power on").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power on").AndReturn([])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
def test_set_boot_device_ok(self):
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_exec_ipmitool')
ipmi._exec_ipmitool(info, "chassis bootdev pxe").\
AndReturn([None, None])
self.mox.ReplayAll()
self.ipmi.state = states.POWER_OFF
self.ipmi._power_on()
self.ipmi.set_boot_device(None, self.node, 'pxe')
self.mox.VerifyAll()
self.assertEqual(self.ipmi.state, states.ERROR)
self.assertEqual(self.ipmi.retries, 3)
def test_get_power_state(self):
self.mox.StubOutWithMock(self.ipmi, '_exec_ipmitool')
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is off\n"])
self.ipmi._exec_ipmitool("power status").AndReturn(
["Chassis Power is on\n"])
def test_set_boot_device_bad_device(self):
self.assertRaises(exception.InvalidParameterValue,
self.ipmi.set_boot_device,
None,
self.node,
'fake-device')
def test_reboot_ok(self):
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_power_off')
self.mox.StubOutWithMock(ipmi, '_power_on')
ipmi._power_off(info)
ipmi._power_on(info).AndReturn(states.POWER_ON)
self.mox.ReplayAll()
self.ipmi.state = states.POWER_OFF
pstate = self.ipmi.get_power_state()
self.assertEqual(pstate, states.POWER_OFF)
self.ipmi.state = states.POWER_ON
pstate = self.ipmi.get_power_state()
self.assertEqual(pstate, states.POWER_ON)
self.ipmi.reboot(None, self.node)
self.mox.VerifyAll()
def test_reboot(self):
# TODO(deva)
pass
def test_reboot_fail(self):
info = ipmi._parse_control_info(self.node)
self.mox.StubOutWithMock(ipmi, '_power_off')
self.mox.StubOutWithMock(ipmi, '_power_on')
def test_get_console_pid_path(self):
self.flags(terminal_pid_dir='/tmp')
path = ipmi._get_console_pid_path(self.ipmi.node_id)
self.assertEqual(path, '/tmp/%s.pid' % self.ipmi.node_id)
def test_console_pid(self):
fd, path = tempfile.mkstemp()
with os.fdopen(fd, 'w') as f:
f.write("12345\n")
self.mox.StubOutWithMock(ipmi, '_get_console_pid_path')
ipmi._get_console_pid_path(self.ipmi.node_id).AndReturn(path)
ipmi._power_off(info)
ipmi._power_on(info).AndReturn(states.ERROR)
self.mox.ReplayAll()
pid = ipmi._get_console_pid(self.ipmi.node_id)
utils.delete_if_exists(path)
self.assertRaises(exception.PowerStateFailure,
self.ipmi.reboot,
None,
self.node)
self.mox.VerifyAll()
self.assertEqual(pid, 12345)
def test_console_pid_nan(self):
fd, path = tempfile.mkstemp()
with os.fdopen(fd, 'w') as f:
f.write("hello world\n")
self.mox.StubOutWithMock(ipmi, '_get_console_pid_path')
ipmi._get_console_pid_path(self.ipmi.node_id).AndReturn(path)
self.mox.ReplayAll()
pid = ipmi._get_console_pid(self.ipmi.node_id)
utils.delete_if_exists(path)
self.mox.VerifyAll()
self.assertTrue(pid is None)
def test_console_pid_file_not_found(self):
pid_path = ipmi._get_console_pid_path(self.ipmi.node_id)
self.mox.StubOutWithMock(os.path, 'exists')
os.path.exists(pid_path).AndReturn(False)
self.mox.ReplayAll()
pid = ipmi._get_console_pid(self.ipmi.node_id)
self.mox.VerifyAll()
self.assertTrue(pid is None)