Allow upgrading shared lock to an exclusive one
New task method upgrade_lock is a simpler and nicer way of doing: if task.shared: task = task_manager.acquire(task.context, task.node.id) This is useful for e.g. periodic tasks to start with a shared lock and upgrade it if any changes are needed. One more use case is to allow vendor passthru implementations to decide if they actually need an exclusive lock instead of just providing them it by default. Both use cases are not covered by this change. Change-Id: I2c019f2855b61330b5b18cee34daa5f399a41f2c
This commit is contained in:
parent
a7e81f17bd
commit
65f3a31b94
|
@ -182,26 +182,11 @@ class TaskManager(object):
|
|||
|
||||
self.context = context
|
||||
self.node = None
|
||||
self.node_id = node_id
|
||||
self.shared = shared
|
||||
|
||||
self.fsm = states.machine.copy()
|
||||
self._purpose = purpose
|
||||
self._debug_timer = time.time()
|
||||
|
||||
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
||||
# some of that pain by retrying our lock attempts. The retrying
|
||||
# module expects a wait_fixed value in milliseconds.
|
||||
@retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
||||
stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts,
|
||||
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
||||
def reserve_node():
|
||||
self.node = objects.Node.reserve(context, CONF.host, node_id)
|
||||
LOG.debug("Node %(node)s successfully reserved for %(purpose)s "
|
||||
"(took %(time).2f seconds)",
|
||||
{'node': node_id, 'purpose': purpose,
|
||||
'time': time.time() - self._debug_timer})
|
||||
self._debug_timer = time.time()
|
||||
|
||||
try:
|
||||
LOG.debug("Attempting to get %(type)s lock on node %(node)s (for "
|
||||
|
@ -209,8 +194,9 @@ class TaskManager(object):
|
|||
{'type': 'shared' if shared else 'exclusive',
|
||||
'node': node_id, 'purpose': purpose})
|
||||
if not self.shared:
|
||||
reserve_node()
|
||||
self._lock()
|
||||
else:
|
||||
self._debug_timer = time.time()
|
||||
self.node = objects.Node.get(context, node_id)
|
||||
self.ports = objects.Port.list_by_node_id(context, self.node.id)
|
||||
self.driver = driver_factory.get_driver(driver_name or
|
||||
|
@ -228,6 +214,42 @@ class TaskManager(object):
|
|||
with excutils.save_and_reraise_exception():
|
||||
self.release_resources()
|
||||
|
||||
def _lock(self):
|
||||
self._debug_timer = time.time()
|
||||
|
||||
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
||||
# some of that pain by retrying our lock attempts. The retrying
|
||||
# module expects a wait_fixed value in milliseconds.
|
||||
@retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
||||
stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts,
|
||||
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
||||
def reserve_node():
|
||||
self.node = objects.Node.reserve(self.context, CONF.host,
|
||||
self.node_id)
|
||||
LOG.debug("Node %(node)s successfully reserved for %(purpose)s "
|
||||
"(took %(time).2f seconds)",
|
||||
{'node': self.node_id, 'purpose': self._purpose,
|
||||
'time': time.time() - self._debug_timer})
|
||||
self._debug_timer = time.time()
|
||||
|
||||
reserve_node()
|
||||
|
||||
def upgrade_lock(self):
|
||||
"""Upgrade a shared lock to an exclusive lock.
|
||||
|
||||
Also reloads node object from the database.
|
||||
Does nothing if lock is already exclusive.
|
||||
"""
|
||||
if self.shared:
|
||||
LOG.debug('Upgrading shared lock on node %(uuid)s for %(purpose)s '
|
||||
'to an exclusive one (shared lock was held %(time).2f '
|
||||
'seconds)',
|
||||
{'uuid': self.node.uuid, 'purpose': self._purpose,
|
||||
'time': time.time() - self._debug_timer})
|
||||
self._lock()
|
||||
self.shared = False
|
||||
|
||||
def spawn_after(self, _spawn_method, *args, **kwargs):
|
||||
"""Call this to spawn a thread to complete the task.
|
||||
|
||||
|
|
|
@ -299,6 +299,34 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
|
|||
get_ports_mock.assert_called_once_with(self.context, self.node.id)
|
||||
get_driver_mock.assert_called_once_with(self.node.driver)
|
||||
|
||||
def test_upgrade_lock(self, get_ports_mock, get_driver_mock,
|
||||
reserve_mock, release_mock, node_get_mock):
|
||||
node_get_mock.return_value = self.node
|
||||
reserve_mock.return_value = self.node
|
||||
with task_manager.TaskManager(self.context, 'fake-node-id',
|
||||
shared=True) as task:
|
||||
self.assertEqual(self.context, task.context)
|
||||
self.assertEqual(self.node, task.node)
|
||||
self.assertEqual(get_ports_mock.return_value, task.ports)
|
||||
self.assertEqual(get_driver_mock.return_value, task.driver)
|
||||
self.assertTrue(task.shared)
|
||||
self.assertFalse(reserve_mock.called)
|
||||
|
||||
task.upgrade_lock()
|
||||
self.assertFalse(task.shared)
|
||||
# second upgrade does nothing
|
||||
task.upgrade_lock()
|
||||
self.assertFalse(task.shared)
|
||||
|
||||
# make sure reserve() was called only once
|
||||
reserve_mock.assert_called_once_with(self.context, self.host,
|
||||
'fake-node-id')
|
||||
release_mock.assert_called_once_with(self.context, self.host,
|
||||
self.node.id)
|
||||
node_get_mock.assert_called_once_with(self.context, 'fake-node-id')
|
||||
get_ports_mock.assert_called_once_with(self.context, self.node.id)
|
||||
get_driver_mock.assert_called_once_with(self.node.driver)
|
||||
|
||||
def test_spawn_after(self, get_ports_mock, get_driver_mock,
|
||||
reserve_mock, release_mock, node_get_mock):
|
||||
thread_mock = mock.Mock(spec_set=['link', 'cancel'])
|
||||
|
|
Loading…
Reference in New Issue