Merge "Deadlock prevention support in synchronize"
This commit is contained in:
commit
3c0aea5feb
@ -135,18 +135,43 @@ def synchronized_remove(glob_name, coordinator=COORDINATOR):
|
|||||||
coordinator.remove_lock(glob_name)
|
coordinator.remove_lock(glob_name)
|
||||||
|
|
||||||
|
|
||||||
def synchronized(lock_name: str,
|
def __acquire(lock, blocking, f_name):
|
||||||
|
"""Acquire a lock and return the time when it was acquired."""
|
||||||
|
t1 = timeutils.now()
|
||||||
|
name = utils.convert_str(lock.name)
|
||||||
|
LOG.debug('Acquiring lock "%s" by "%s"', name, f_name)
|
||||||
|
lock.acquire(blocking)
|
||||||
|
t2 = timeutils.now()
|
||||||
|
LOG.debug('Lock "%s" acquired by "%s" :: waited %0.3fs',
|
||||||
|
name, f_name, t2 - t1)
|
||||||
|
return t2
|
||||||
|
|
||||||
|
|
||||||
|
def __release(lock, acquired_time, f_name):
|
||||||
|
"""Release a lock ignoring exceptions."""
|
||||||
|
name = utils.convert_str(lock.name)
|
||||||
|
try:
|
||||||
|
lock.release()
|
||||||
|
held = timeutils.now() - acquired_time
|
||||||
|
LOG.debug('Lock "%s" released by "%s" :: held %0.3fs',
|
||||||
|
name, f_name, held)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error('Failed to release lock "%s": %s', name, e)
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized(*lock_names: str,
|
||||||
blocking: bool = True,
|
blocking: bool = True,
|
||||||
coordinator: Coordinator = COORDINATOR) -> Callable:
|
coordinator: Coordinator = COORDINATOR) -> Callable:
|
||||||
"""Synchronization decorator.
|
"""Synchronization decorator.
|
||||||
|
|
||||||
:param str lock_name: Lock name.
|
:param str lock_names: Arbitrary number of Lock names.
|
||||||
:param blocking: If True, blocks until the lock is acquired.
|
:param blocking: If True, blocks until the lock is acquired.
|
||||||
If False, raises exception when not acquired. Otherwise,
|
If False, raises exception when not acquired. Otherwise,
|
||||||
the value is used as a timeout value and if lock is not acquired
|
the value is used as a timeout value and if lock is not acquired
|
||||||
after this number of seconds exception is raised.
|
after this number of seconds exception is raised. This is a keyword
|
||||||
|
only argument.
|
||||||
:param coordinator: Coordinator class to use when creating lock.
|
:param coordinator: Coordinator class to use when creating lock.
|
||||||
Defaults to the global coordinator.
|
Defaults to the global coordinator. This is a keyword only argument.
|
||||||
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
|
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
|
||||||
|
|
||||||
Decorating a method like so::
|
Decorating a method like so::
|
||||||
@ -175,37 +200,44 @@ def synchronized(lock_name: str,
|
|||||||
def foo(self, vol, snap):
|
def foo(self, vol, snap):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
Multiple locks can be requested simultaneously and the decorator will
|
||||||
|
reorder the names by rendered lock names to prevent potential deadlocks.
|
||||||
|
|
||||||
|
@synchronized('{f_name}-{vol.id}-{snap[name]}',
|
||||||
|
'{f_name}-{vol.id}.delete')
|
||||||
|
def foo(self, vol, snap):
|
||||||
|
...
|
||||||
|
|
||||||
Available field names are: decorated function parameters and
|
Available field names are: decorated function parameters and
|
||||||
`f_name` as a decorated function name.
|
`f_name` as a decorated function name.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@decorator.decorator
|
@decorator.decorator
|
||||||
def _synchronized(f, *a, **k) -> Callable:
|
def _synchronized(f, *a, **k) -> Callable:
|
||||||
call_args = inspect.getcallargs(f, *a, **k)
|
call_args = inspect.getcallargs(f, *a, **k)
|
||||||
call_args['f_name'] = f.__name__
|
call_args['f_name'] = f.__name__
|
||||||
lock = coordinator.get_lock(lock_name.format(**call_args))
|
|
||||||
name = utils.convert_str(lock.name)
|
# Prevent deadlocks not duplicating and sorting them by name to always
|
||||||
|
# acquire them in the same order.
|
||||||
|
names = sorted(set([name.format(**call_args) for name in lock_names]))
|
||||||
|
locks = [coordinator.get_lock(name) for name in names]
|
||||||
|
acquired_times = []
|
||||||
f_name = f.__name__
|
f_name = f.__name__
|
||||||
t1 = timeutils.now()
|
t1 = timeutils.now()
|
||||||
t2 = None
|
|
||||||
try:
|
try:
|
||||||
LOG.debug('Acquiring lock "%(name)s" by "%(f_name)s"',
|
if len(locks) > 1: # Don't pollute logs for single locks
|
||||||
{'name': name, 'f_name': f_name})
|
LOG.debug('Acquiring %s locks by %s', len(locks), f_name)
|
||||||
with lock(blocking):
|
|
||||||
t2 = timeutils.now()
|
for lock in locks:
|
||||||
LOG.debug('Lock "%(name)s" acquired by "%(f_name)s" :: '
|
acquired_times.append(__acquire(lock, blocking, f_name))
|
||||||
'waited %(wait)s',
|
|
||||||
{'name': name, 'f_name': f_name,
|
if len(locks) > 1:
|
||||||
'wait': "%0.3fs" % (t2 - t1)})
|
t = timeutils.now() - t1
|
||||||
return f(*a, **k)
|
LOG.debug('Acquired %s locks by %s in %0.3fs',
|
||||||
|
len(locks), f_name, t)
|
||||||
|
|
||||||
|
return f(*a, **k)
|
||||||
finally:
|
finally:
|
||||||
t3 = timeutils.now()
|
for lock, acquired_time in zip(locks, acquired_times):
|
||||||
if t2 is None:
|
__release(lock, acquired_time, f_name)
|
||||||
held_secs = "N/A"
|
|
||||||
else:
|
|
||||||
held_secs = "%0.3fs" % (t3 - t2)
|
|
||||||
LOG.debug(
|
|
||||||
'Lock "%(name)s" released by "%(f_name)s" :: held %(held)s',
|
|
||||||
{'name': name, 'f_name': f_name, 'held': held_secs})
|
|
||||||
|
|
||||||
return _synchronized
|
return _synchronized
|
||||||
|
@ -209,3 +209,71 @@ class CoordinationTestCase(test.TestCase):
|
|||||||
coordination.synchronized_remove(mock.sentinel.glob_name, coordinator)
|
coordination.synchronized_remove(mock.sentinel.glob_name, coordinator)
|
||||||
coordinator.remove_lock.assert_called_once_with(
|
coordinator.remove_lock.assert_called_once_with(
|
||||||
mock.sentinel.glob_name)
|
mock.sentinel.glob_name)
|
||||||
|
|
||||||
|
@mock.patch.object(coordination.COORDINATOR, 'get_lock')
|
||||||
|
def test_synchronized_multiple_templates(self, get_lock):
|
||||||
|
"""Test locks requested in the right order and duplicates removed."""
|
||||||
|
locks = ['lock-{f_name}-%s-{foo.val}-{bar[val]}' % i for i in range(3)]
|
||||||
|
expect = [f'lock-func-{i}-7-8' for i in range(3)]
|
||||||
|
|
||||||
|
@coordination.synchronized(locks[1], locks[0], locks[1], locks[2])
|
||||||
|
def func(foo, bar):
|
||||||
|
pass
|
||||||
|
|
||||||
|
foo = mock.Mock(val=7)
|
||||||
|
bar = mock.MagicMock()
|
||||||
|
bar.__getitem__.return_value = 8
|
||||||
|
func(foo, bar)
|
||||||
|
self.assertEqual(len(expect), get_lock.call_count)
|
||||||
|
get_lock.assert_has_calls([mock.call(lock) for lock in expect])
|
||||||
|
|
||||||
|
self.assertEqual(len(expect), get_lock.return_value.acquire.call_count)
|
||||||
|
get_lock.return_value.acquire.assert_has_calls(
|
||||||
|
[mock.call(True)] * len(expect))
|
||||||
|
|
||||||
|
self.assertEqual(['foo', 'bar'], inspect.getfullargspec(func)[0])
|
||||||
|
|
||||||
|
@mock.patch('oslo_utils.timeutils.now', side_effect=[1, 2])
|
||||||
|
def test___acquire(self, mock_now):
|
||||||
|
lock = mock.Mock()
|
||||||
|
# Using getattr to avoid AttributeError: module 'cinder.coordination'
|
||||||
|
# has no attribute '_CoordinationTestCase__acquire'
|
||||||
|
res = getattr(coordination, '__acquire')(lock, mock.sentinel.blocking,
|
||||||
|
mock.sentinel.f_name)
|
||||||
|
self.assertEqual(2, res)
|
||||||
|
self.assertEqual(2, mock_now.call_count)
|
||||||
|
mock_now.assert_has_calls([mock.call(), mock.call()])
|
||||||
|
lock.acquire.assert_called_once_with(mock.sentinel.blocking)
|
||||||
|
|
||||||
|
@mock.patch('oslo_utils.timeutils.now')
|
||||||
|
def test___acquire_propagates_exception(self, mock_now):
|
||||||
|
lock = mock.Mock()
|
||||||
|
lock.acquire.side_effect = ValueError
|
||||||
|
# Using getattr to avoid AttributeError: module 'cinder.coordination'
|
||||||
|
# has no attribute '_CoordinationTestCase__acquire'
|
||||||
|
self.assertRaises(ValueError,
|
||||||
|
getattr(coordination, '__acquire'),
|
||||||
|
lock, mock.sentinel.blocking, mock.sentinel.f_name)
|
||||||
|
mock_now.assert_called_once_with()
|
||||||
|
lock.acquire.assert_called_once_with(mock.sentinel.blocking)
|
||||||
|
|
||||||
|
@mock.patch('oslo_utils.timeutils.now', return_value=2)
|
||||||
|
def test___release(self, mock_now):
|
||||||
|
lock = mock.Mock()
|
||||||
|
# Using getattr to avoid AttributeError: module 'cinder.coordination'
|
||||||
|
# has no attribute '_CoordinationTestCase__release'
|
||||||
|
getattr(coordination, '__release')(lock, 1, mock.sentinel.f_name)
|
||||||
|
|
||||||
|
mock_now.assert_called_once_with()
|
||||||
|
lock.release.assert_called_once_with()
|
||||||
|
|
||||||
|
@mock.patch('oslo_utils.timeutils.now')
|
||||||
|
def test___release_ignores_exception(self, mock_now):
|
||||||
|
lock = mock.Mock()
|
||||||
|
lock.release.side_effect = ValueError
|
||||||
|
# Using getattr to avoid AttributeError: module 'cinder.coordination'
|
||||||
|
# has no attribute '_CoordinationTestCase__release'
|
||||||
|
getattr(coordination, '__release')(lock, 1, mock.sentinel.f_name)
|
||||||
|
|
||||||
|
mock_now.assert_not_called()
|
||||||
|
lock.release.assert_called_once_with()
|
||||||
|
@ -642,6 +642,10 @@ race conditions.
|
|||||||
Global locking functionality is provided by the `synchronized` decorator from
|
Global locking functionality is provided by the `synchronized` decorator from
|
||||||
`cinder.coordination`.
|
`cinder.coordination`.
|
||||||
|
|
||||||
|
.. attention:: Optional `blocking` and `coordinator` arguments to the
|
||||||
|
`synchronized` decorator are **keyword** arguments only and cannot be passed
|
||||||
|
as positional arguments.
|
||||||
|
|
||||||
This method is more advanced than the one used for the `Process locks`_ and the
|
This method is more advanced than the one used for the `Process locks`_ and the
|
||||||
`Node locks`_, as the name supports templates. For the template we have all
|
`Node locks`_, as the name supports templates. For the template we have all
|
||||||
the method parameters as well as `f_name` that represents that name of the
|
the method parameters as well as `f_name` that represents that name of the
|
||||||
@ -673,6 +677,37 @@ attribute from `self`, and a recursive reference in the `snapshot` parameter.
|
|||||||
@coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}')
|
@coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}')
|
||||||
def create_snapshot(self, snapshot):
|
def create_snapshot(self, snapshot):
|
||||||
|
|
||||||
|
Some drivers may require multiple locks for a critical section, which could
|
||||||
|
potentially create deadlocks. Like in the following example, where `PowerMax`
|
||||||
|
method `move_volume_between_storage_groups` creates 2 locks:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
@coordination.synchronized(
|
||||||
|
"emc-sg-{source_storagegroup_name}-{serial_number}")
|
||||||
|
@coordination.synchronized(
|
||||||
|
"emc-sg-{target_storagegroup_name}-{serial_number}")
|
||||||
|
def move_volume_between_storage_groups(
|
||||||
|
self, serial_number, device_id, source_storagegroup_name,
|
||||||
|
target_storagegroup_name, extra_specs, force=False,
|
||||||
|
parent_sg=None):
|
||||||
|
|
||||||
|
That code can result in a deadlock if 2 opposite requests come in concurrently
|
||||||
|
and their first lock acquisition interleaves.
|
||||||
|
|
||||||
|
The solution is calling the `synchronized` decorator with both lock names and
|
||||||
|
let it resolve the acquire ordering issue for us. The right code would be:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
@coordination.synchronized(
|
||||||
|
"emc-sg-{source_storagegroup_name}-{serial_number}",
|
||||||
|
"emc-sg-{target_storagegroup_name}-{serial_number}")
|
||||||
|
def move_volume_between_storage_groups(
|
||||||
|
self, serial_number, device_id, source_storagegroup_name,
|
||||||
|
target_storagegroup_name, extra_specs, force=False,
|
||||||
|
parent_sg=None):
|
||||||
|
|
||||||
Internally Cinder uses the `Tooz library`_ to provide the distributed locking.
|
Internally Cinder uses the `Tooz library`_ to provide the distributed locking.
|
||||||
By default, this library is configured for Active-Passive deployments, where it
|
By default, this library is configured for Active-Passive deployments, where it
|
||||||
uses file locks equivalent to those used for `Node locks`_.
|
uses file locks equivalent to those used for `Node locks`_.
|
||||||
|
Loading…
Reference in New Issue
Block a user