Fix task_manager acquire post landing for c4f2f26ed

In original implementation we used context lib with try/finally
to make sure release_resources will be called even if there was an
exception inside acquire_resources. With c4f2f26ed we broke that
functionality, so this patch intended to fix this.

Closes-Bug: #1279992
Change-Id: I04abc67aedc226bc126b1d93901c71bcd8a5bf15
This commit is contained in:
Max Lobur 2014-02-14 00:53:13 +02:00
parent 106e6e157a
commit aa6dcc9f3f
4 changed files with 92 additions and 60 deletions

View File

@ -204,8 +204,7 @@ class ConductorManager(service.PeriodicService):
"The desired new state is %(state)s.")
% {'node': node_id, 'state': new_state})
task = task_manager.TaskManager(context)
task.acquire_resources(node_id, shared=False)
task = task_manager.TaskManager(context, node_id, shared=False)
try:
# Start requested action in the background.

View File

@ -58,8 +58,7 @@ approach is to use manager._spawn_worker method and release resources using
link method of the returned thread object.
For example (somewhere inside conductor manager)::
task = task_manager.TaskManager(context)
task.acquire_resources(node_id, shared=False)
task = task_manager.TaskManager(context, node_id, shared=False)
try:
# Start requested action in the background.
@ -94,6 +93,8 @@ multi-node tasks more easily. Once implemented, it might look like this::
"""
from oslo.config import cfg
from ironic.openstack.common import excutils
from ironic.common import exception
from ironic.conductor import resource_manager
from ironic.db import api as dbapi
@ -128,21 +129,20 @@ def acquire(context, node_ids, shared=False, driver_name=None):
:returns: An instance of :class:`TaskManager`.
"""
mgr = TaskManager(context)
mgr.acquire_resources(node_ids, shared, driver_name)
return mgr
return TaskManager(context, node_ids, shared, driver_name)
class TaskManager(object):
"""Context manager for tasks."""
def __init__(self, context):
def __init__(self, context, node_ids, shared=False, driver_name=None):
self.context = context
self.resources = []
self.shared = False
self.shared = shared
self.dbapi = dbapi.get_instance()
self._acquire_resources(node_ids, driver_name)
def acquire_resources(self, node_ids, shared=False, driver_name=None):
def _acquire_resources(self, node_ids, driver_name=None):
"""Acquire a lock on one or more Nodes.
Acquire a lock atomically on a non-empty set of nodes. The lock
@ -156,12 +156,6 @@ class TaskManager(object):
:param driver_name: Name of Driver. Default: None.
"""
# Do not allow multiple acquire calls.
if self.resources:
raise exception.IronicException(
_("Task manager already has resources."))
self.shared = shared
# instead of generating an exception, DTRT and convert to a list
if not isinstance(node_ids, list):
@ -169,23 +163,34 @@ class TaskManager(object):
if not self.shared:
self.dbapi.reserve_nodes(CONF.host, node_ids)
for node_id in node_ids:
node_mgr = resource_manager.NodeManager.acquire(node_id, self,
driver_name)
self.resources.append(node_mgr)
try:
for node_id in node_ids:
node_mgr = resource_manager.NodeManager.acquire(
node_id, self, driver_name)
self.resources.append(node_mgr)
except Exception:
with excutils.save_and_reraise_exception():
# Revert db changes for all the resources.
if not self.shared:
self.dbapi.release_nodes(CONF.host, node_ids)
# Release NodeManager resources which has been already loaded.
for node_id in [r.id for r in self.resources]:
resource_manager.NodeManager.release(node_id, self)
def release_resources(self):
"""Release all the resources acquired for this TaskManager."""
# Do not allow multiple release calls.
if not self.resources:
raise exception.IronicException(
_("Task manager doesn't have resources to release."))
# Nothing to release.
return
node_ids = [r.id for r in self.resources]
for node_id in node_ids:
resource_manager.NodeManager.release(node_id, self)
if not self.shared:
self.dbapi.release_nodes(CONF.host, node_ids)
self.resources = []
@property

View File

@ -33,7 +33,6 @@ class NodePowerActionTestCase(base.DbTestCase):
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
self.driver = mgr_utils.get_mocked_node_manager()
self.task = task_manager.TaskManager(self.context)
def test_node_power_action_power_on(self):
"""Test node_power_action to turn node power on."""
@ -41,13 +40,13 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_OFF
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.POWER_ON)
node.refresh(self.context)
@ -62,13 +61,13 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.POWER_OFF)
node.refresh(self.context)
@ -83,10 +82,10 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'reboot') as reboot_mock:
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.REBOOT)
node.refresh(self.context)
@ -103,7 +102,7 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
@ -111,8 +110,8 @@ class NodePowerActionTestCase(base.DbTestCase):
self.assertRaises(exception.InvalidParameterValue,
conductor_utils.node_power_action,
self.task,
self.task.node,
task,
task.node,
"INVALID_POWER_STATE")
node.refresh(self.context)
@ -122,7 +121,7 @@ class NodePowerActionTestCase(base.DbTestCase):
self.assertIsNotNone(node['last_error'])
# last_error is cleared when a new transaction happens
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.POWER_OFF)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_OFF)
@ -141,9 +140,9 @@ class NodePowerActionTestCase(base.DbTestCase):
power_state=states.POWER_ON,
target_power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.POWER_OFF)
node.refresh(self.context)
@ -160,7 +159,7 @@ class NodePowerActionTestCase(base.DbTestCase):
last_error='anything but None',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
@ -168,7 +167,7 @@ class NodePowerActionTestCase(base.DbTestCase):
with mock.patch.object(self.driver.power, 'set_power_state') \
as set_power_mock:
conductor_utils.node_power_action(self.task, self.task.node,
conductor_utils.node_power_action(task, task.node,
states.POWER_ON)
node.refresh(self.context)
@ -187,7 +186,7 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'validate') \
as validate_mock:
@ -196,8 +195,8 @@ class NodePowerActionTestCase(base.DbTestCase):
self.assertRaises(exception.InvalidParameterValue,
conductor_utils.node_power_action,
self.task,
self.task.node,
task,
task.node,
states.POWER_ON)
node.refresh(self.context)
@ -214,7 +213,7 @@ class NodePowerActionTestCase(base.DbTestCase):
driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
task = task_manager.TaskManager(self.context, node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
@ -226,8 +225,8 @@ class NodePowerActionTestCase(base.DbTestCase):
self.assertRaises(
exception.IronicException,
conductor_utils.node_power_action,
self.task,
self.task.node,
task,
task.node,
states.POWER_ON)
node.refresh(self.context)

View File

@ -18,10 +18,12 @@
"""Tests for :class:`ironic.conductor.task_manager`."""
import mock
from testtools import matchers
from ironic.common import exception
from ironic.common import utils as ironic_utils
from ironic.conductor import resource_manager
from ironic.conductor import task_manager
from ironic.db import api as dbapi
from ironic.openstack.common import context
@ -58,14 +60,20 @@ class TaskManagerTestCase(base.DbTestCase):
self.uuids.sort()
def test_get_one_node(self):
uuids = [self.uuids[0]]
node_uuid = self.uuids[0]
self.config(host='test-host')
with task_manager.acquire(self.context, uuids) as task:
with task_manager.acquire(self.context, node_uuid) as task:
node = task.resources[0].node
self.assertEqual(uuids[0], node.uuid)
self.assertEqual(node_uuid, node.uuid)
self.assertEqual('test-host', node.reservation)
# Check that db reservation is set.
node.refresh(self.context)
self.assertEqual('test-host', node.reservation)
# Check that resource has been cached in NodeManager
self.assertIsNotNone(
resource_manager.NodeManager._nodes.get(node_uuid))
def test_get_many_nodes(self):
uuids = self.uuids[1:3]
@ -87,19 +95,6 @@ class TaskManagerTestCase(base.DbTestCase):
more_uuids) as another_task:
self.assertThat(another_task, ContainsUUIDs(more_uuids))
def test_get_locked_node(self):
uuids = self.uuids[0:2]
def _lock_again(u):
with task_manager.acquire(self.context, u):
raise exception.IronicException("Acquired lock twice.")
with task_manager.acquire(self.context, uuids) as task:
self.assertThat(task, ContainsUUIDs(uuids))
self.assertRaises(exception.NodeLocked,
_lock_again,
uuids)
def test_get_shared_lock(self):
uuids = self.uuids[0:2]
@ -117,6 +112,40 @@ class TaskManagerTestCase(base.DbTestCase):
shared=True) as inner_task:
self.assertThat(inner_task, ContainsUUIDs(uuids))
def test_get_one_node_driver_load_exception(self):
node_uuid = self.uuids[0]
with mock.patch.object(resource_manager.NodeManager, 'acquire') \
as acquire_mock:
acquire_mock.side_effect = exception.DriverNotFound(
driver_name='test'
)
self.assertRaises(exception.DriverNotFound,
task_manager.TaskManager,
self.context, node_uuid)
# Check that db node reservation is not set.
node = self.dbapi.get_node(node_uuid)
self.assertIsNone(node.reservation)
# Check that resource has not been cached in NodeManager
self.assertNotIn(node.uuid, resource_manager.NodeManager._nodes)
def test_get_one_node_locked_exception(self):
node_uuid = self.uuids[0]
with mock.patch.object(self.dbapi, 'reserve_nodes') as reserve_mock:
reserve_mock.side_effect = exception.NodeLocked(node='test',
host='host1')
self.assertRaises(exception.NodeLocked,
task_manager.TaskManager,
self.context, node_uuid)
# Check that db node reservation is not set.
node = self.dbapi.get_node(node_uuid)
self.assertIsNone(node.reservation)
# Check that resource has not been cached in NodeManager
self.assertNotIn(node.uuid, resource_manager.NodeManager._nodes)
class ExclusiveLockDecoratorTestCase(base.DbTestCase):