Merge "Adds an abstract locking layer"
This commit is contained in:
commit
b353ec1b3d
|
@ -0,0 +1,70 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
import six
|
||||
|
||||
_LOCK_TEMPLATE = 'node-%s'
|
||||
_SEMAPHORES = lockutils.Semaphores()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseLock(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def acquire(self, blocking=True, timeout=None):
|
||||
"""Acquire lock."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def release(self):
|
||||
"""Release lock."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_locked(self):
|
||||
"""Return lock status"""
|
||||
|
||||
|
||||
class InternalLock(BaseLock):
|
||||
"""Locking mechanism based on threading.Semaphore."""
|
||||
|
||||
def __init__(self, uuid):
|
||||
self._lock = lockutils.internal_lock(_LOCK_TEMPLATE % uuid,
|
||||
semaphores=_SEMAPHORES)
|
||||
self._locked = False
|
||||
|
||||
def acquire(self, blocking=True, timeout=None):
|
||||
# NOTE(kaifeng) timeout is only available on python3
|
||||
if not self._locked:
|
||||
self._locked = self._lock.acquire(blocking=blocking)
|
||||
return self._locked
|
||||
|
||||
def release(self):
|
||||
if self._locked:
|
||||
self._lock.release()
|
||||
self._locked = False
|
||||
|
||||
def is_locked(self):
|
||||
return self._locked
|
||||
|
||||
def __enter__(self):
|
||||
self._lock.acquire()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self._lock.release()
|
||||
|
||||
|
||||
def get_lock(uuid):
|
||||
return InternalLock(uuid)
|
|
@ -22,7 +22,6 @@ import operator
|
|||
|
||||
from automaton import exceptions as automaton_errors
|
||||
from ironicclient import exceptions
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
from oslo_utils import excutils
|
||||
|
@ -34,26 +33,15 @@ from sqlalchemy.orm import exc as orm_errors
|
|||
|
||||
from ironic_inspector.common.i18n import _
|
||||
from ironic_inspector.common import ironic as ir_utils
|
||||
from ironic_inspector.common import locking
|
||||
from ironic_inspector import db
|
||||
from ironic_inspector import introspection_state as istate
|
||||
from ironic_inspector import utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
LOG = utils.getProcessingLogger(__name__)
|
||||
|
||||
|
||||
MACS_ATTRIBUTE = 'mac'
|
||||
_LOCK_TEMPLATE = 'node-%s'
|
||||
_SEMAPHORES = lockutils.Semaphores()
|
||||
|
||||
|
||||
def _get_lock(uuid):
|
||||
"""Get lock object for a given node UUID."""
|
||||
return lockutils.internal_lock(_LOCK_TEMPLATE % uuid,
|
||||
semaphores=_SEMAPHORES)
|
||||
|
||||
|
||||
class NodeInfo(object):
|
||||
|
@ -84,13 +72,12 @@ class NodeInfo(object):
|
|||
# equivalent to True actually.
|
||||
self._manage_boot = manage_boot if manage_boot is not None else True
|
||||
# This is a lock on a node UUID, not on a NodeInfo object
|
||||
self._lock = _get_lock(uuid)
|
||||
self._lock = locking.get_lock(uuid)
|
||||
# Whether lock was acquired using this NodeInfo object
|
||||
self._locked = False
|
||||
self._fsm = None
|
||||
|
||||
def __del__(self):
|
||||
if self._locked:
|
||||
if self._lock.is_locked():
|
||||
LOG.warning('BUG: node lock was not released by the moment '
|
||||
'node info object is deleted')
|
||||
self._lock.release()
|
||||
|
@ -112,12 +99,11 @@ class NodeInfo(object):
|
|||
return immediately.
|
||||
:returns: boolean value, whether lock was acquired successfully
|
||||
"""
|
||||
if self._locked:
|
||||
if self._lock.is_locked():
|
||||
return True
|
||||
|
||||
LOG.debug('Attempting to acquire lock', node_info=self)
|
||||
if self._lock.acquire(blocking):
|
||||
self._locked = True
|
||||
LOG.debug('Successfully acquired lock', node_info=self)
|
||||
return True
|
||||
else:
|
||||
|
@ -129,10 +115,9 @@ class NodeInfo(object):
|
|||
|
||||
Does nothing if lock was not acquired using this NodeInfo object.
|
||||
"""
|
||||
if self._locked:
|
||||
if self._lock.is_locked():
|
||||
LOG.debug('Successfully released lock', node_info=self)
|
||||
self._lock.release()
|
||||
self._locked = False
|
||||
|
||||
@property
|
||||
def version_id(self):
|
||||
|
@ -663,7 +648,7 @@ def release_lock(func):
|
|||
finally:
|
||||
# FIXME(milan) hacking the test cases to work
|
||||
# with release_lock.assert_called_once...
|
||||
if node_info._locked:
|
||||
if node_info._lock.is_locked():
|
||||
node_info.release_lock()
|
||||
return inner
|
||||
|
||||
|
@ -738,7 +723,7 @@ def delete_nodes_not_in_list(uuids):
|
|||
for uuid in inspector_uuids - uuids:
|
||||
LOG.warning('Node %s was deleted from Ironic, dropping from Ironic '
|
||||
'Inspector database', uuid)
|
||||
with _get_lock(uuid):
|
||||
with locking.get_lock(uuid):
|
||||
_delete_node(uuid)
|
||||
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ from oslo_utils import uuidutils
|
|||
import six
|
||||
|
||||
from ironic_inspector.common import ironic as ir_utils
|
||||
from ironic_inspector.common import locking
|
||||
from ironic_inspector import db
|
||||
from ironic_inspector import introspection_state as istate
|
||||
from ironic_inspector import node_cache
|
||||
|
@ -57,7 +58,7 @@ class TestNodeCache(test_base.NodeTest):
|
|||
(datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
|
||||
< node.started_at <
|
||||
datetime.datetime.utcnow() + datetime.timedelta(seconds=60)))
|
||||
self.assertFalse(node._locked)
|
||||
self.assertFalse(node._lock.is_locked())
|
||||
|
||||
res = set(db.model_query(db.Node.uuid,
|
||||
db.Node.started_at).all())
|
||||
|
@ -102,12 +103,12 @@ class TestNodeCache(test_base.NodeTest):
|
|||
uuid=self.uuid).first()
|
||||
self.assertIsNone(row_option)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(locking, 'get_lock', autospec=True)
|
||||
@mock.patch.object(node_cache, '_list_node_uuids')
|
||||
@mock.patch.object(node_cache, '_delete_node')
|
||||
def test_delete_nodes_not_in_list(self, mock__delete_node,
|
||||
mock__list_node_uuids,
|
||||
mock__get_lock_ctx):
|
||||
mock_get_lock):
|
||||
uuid2 = uuidutils.generate_uuid()
|
||||
uuids = {self.uuid}
|
||||
mock__list_node_uuids.return_value = {self.uuid, uuid2}
|
||||
|
@ -115,8 +116,8 @@ class TestNodeCache(test_base.NodeTest):
|
|||
with session.begin():
|
||||
node_cache.delete_nodes_not_in_list(uuids)
|
||||
mock__delete_node.assert_called_once_with(uuid2)
|
||||
mock__get_lock_ctx.assert_called_once_with(uuid2)
|
||||
mock__get_lock_ctx.return_value.__enter__.assert_called_once_with()
|
||||
mock_get_lock.assert_called_once_with(uuid2)
|
||||
mock_get_lock.return_value.__enter__.assert_called_once_with()
|
||||
|
||||
def test_active_macs(self):
|
||||
session = db.get_writer_session()
|
||||
|
@ -243,7 +244,7 @@ class TestNodeCacheFind(test_base.NodeTest):
|
|||
datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
|
||||
< res.started_at <
|
||||
datetime.datetime.utcnow() + datetime.timedelta(seconds=1))
|
||||
self.assertTrue(res._locked)
|
||||
self.assertTrue(res._lock.is_locked())
|
||||
|
||||
def test_same_bmc_different_macs(self):
|
||||
uuid2 = uuidutils.generate_uuid()
|
||||
|
@ -274,7 +275,7 @@ class TestNodeCacheFind(test_base.NodeTest):
|
|||
datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
|
||||
< res.started_at <
|
||||
datetime.datetime.utcnow() + datetime.timedelta(seconds=1))
|
||||
self.assertTrue(res._locked)
|
||||
self.assertTrue(res._lock.is_locked())
|
||||
|
||||
def test_macs_not_found(self):
|
||||
self.assertRaises(utils.Error, node_cache.find_node,
|
||||
|
@ -297,7 +298,7 @@ class TestNodeCacheFind(test_base.NodeTest):
|
|||
datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
|
||||
< res.started_at <
|
||||
datetime.datetime.utcnow() + datetime.timedelta(seconds=1))
|
||||
self.assertTrue(res._locked)
|
||||
self.assertTrue(res._lock.is_locked())
|
||||
|
||||
def test_inconsistency(self):
|
||||
session = db.get_writer_session()
|
||||
|
@ -352,7 +353,7 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||
db.model_query(db.Attribute).count())
|
||||
self.assertEqual(1, db.model_query(db.Option).count())
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(locking, 'get_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_ok(self, time_mock, get_lock_mock):
|
||||
time_mock.return_value = datetime.datetime.utcnow()
|
||||
|
@ -398,9 +399,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||
self.assertEqual([], db.model_query(db.Option).all())
|
||||
lock_mock.assert_called_once_with(mock.ANY, blocking=False)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(locking, 'get_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_timeout_active_state(self, time_mock, get_lock_mock):
|
||||
def test_timeout_active_state(self, time_mock, lock_mock):
|
||||
time_mock.return_value = self.started_at
|
||||
session = db.get_writer_session()
|
||||
CONF.set_override('timeout', 1)
|
||||
|
@ -452,7 +453,7 @@ class TestNodeCacheGetNode(test_base.NodeTest):
|
|||
self.assertEqual(started_at, info.started_at)
|
||||
self.assertIsNone(info.finished_at)
|
||||
self.assertIsNone(info.error)
|
||||
self.assertFalse(info._locked)
|
||||
self.assertFalse(info._lock.is_locked())
|
||||
|
||||
def test_not_found(self):
|
||||
self.assertRaises(utils.Error, node_cache.get_node,
|
||||
|
@ -475,7 +476,7 @@ class TestNodeCacheGetNode(test_base.NodeTest):
|
|||
self.assertEqual(started_at, info.started_at)
|
||||
self.assertIsNone(info.finished_at)
|
||||
self.assertIsNone(info.error)
|
||||
self.assertFalse(info._locked)
|
||||
self.assertFalse(info._lock.is_locked())
|
||||
ironic.node.get.assert_called_once_with('name')
|
||||
|
||||
|
||||
|
@ -520,7 +521,7 @@ class TestNodeInfoFinished(test_base.NodeTest):
|
|||
def test_release_lock(self):
|
||||
self.node_info.acquire_lock()
|
||||
self.node_info.finished(istate.Events.finish)
|
||||
self.assertFalse(self.node_info._locked)
|
||||
self.assertFalse(self.node_info._lock.is_locked())
|
||||
|
||||
|
||||
class TestNodeInfoOptions(test_base.NodeTest):
|
||||
|
@ -846,46 +847,47 @@ class TestNodeCacheGetByPath(test_base.NodeTest):
|
|||
self.assertRaises(KeyError, self.node_info.get_by_path, '/extra/foo')
|
||||
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
class TestLock(test_base.NodeTest):
|
||||
def test_acquire(self, get_lock_mock):
|
||||
@mock.patch.object(locking.lockutils, 'internal_lock', autospec=True)
|
||||
class TestInternalLock(test_base.NodeTest):
|
||||
def test_acquire(self, lock_mock):
|
||||
node_info = node_cache.NodeInfo(self.uuid)
|
||||
self.addCleanup(node_info.release_lock)
|
||||
self.assertFalse(node_info._locked)
|
||||
get_lock_mock.assert_called_once_with(self.uuid)
|
||||
self.assertFalse(get_lock_mock.return_value.acquire.called)
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
lock_mock.assert_called_once_with('node-{}'.format(self.uuid),
|
||||
semaphores=mock.ANY)
|
||||
self.assertFalse(lock_mock.return_value.acquire.called)
|
||||
|
||||
self.assertTrue(node_info.acquire_lock())
|
||||
self.assertTrue(node_info._locked)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
self.assertTrue(node_info.acquire_lock())
|
||||
self.assertTrue(node_info._locked)
|
||||
get_lock_mock.return_value.acquire.assert_called_once_with(True)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
lock_mock.return_value.acquire.assert_called_once_with(blocking=True)
|
||||
|
||||
def test_release(self, get_lock_mock):
|
||||
def test_release(self, lock_mock):
|
||||
node_info = node_cache.NodeInfo(self.uuid)
|
||||
node_info.acquire_lock()
|
||||
self.assertTrue(node_info._locked)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
node_info.release_lock()
|
||||
self.assertFalse(node_info._locked)
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
node_info.release_lock()
|
||||
self.assertFalse(node_info._locked)
|
||||
get_lock_mock.return_value.acquire.assert_called_once_with(True)
|
||||
get_lock_mock.return_value.release.assert_called_once_with()
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
lock_mock.return_value.acquire.assert_called_once_with(blocking=True)
|
||||
lock_mock.return_value.release.assert_called_once_with()
|
||||
|
||||
def test_acquire_non_blocking(self, get_lock_mock):
|
||||
def test_acquire_non_blocking(self, lock_mock):
|
||||
node_info = node_cache.NodeInfo(self.uuid)
|
||||
self.addCleanup(node_info.release_lock)
|
||||
self.assertFalse(node_info._locked)
|
||||
get_lock_mock.return_value.acquire.side_effect = iter([False, True])
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
lock_mock.return_value.acquire.side_effect = iter([False, True])
|
||||
|
||||
self.assertFalse(node_info.acquire_lock(blocking=False))
|
||||
self.assertFalse(node_info._locked)
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
self.assertTrue(node_info.acquire_lock(blocking=False))
|
||||
self.assertTrue(node_info._locked)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
self.assertTrue(node_info.acquire_lock(blocking=False))
|
||||
self.assertTrue(node_info._locked)
|
||||
get_lock_mock.return_value.acquire.assert_called_with(False)
|
||||
self.assertEqual(2, get_lock_mock.return_value.acquire.call_count)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
lock_mock.return_value.acquire.assert_called_with(blocking=False)
|
||||
self.assertEqual(2, lock_mock.return_value.acquire.call_count)
|
||||
|
||||
|
||||
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
||||
|
@ -1168,7 +1170,7 @@ class TestFsmEvent(test_base.NodeStateTest):
|
|||
def test_unlock(self):
|
||||
@node_cache.release_lock
|
||||
def func(node_info):
|
||||
self.assertTrue(node_info._locked)
|
||||
self.assertTrue(node_info._lock.is_locked())
|
||||
|
||||
self.node_info.acquire_lock(blocking=True)
|
||||
with mock.patch.object(self.node_info, 'release_lock',
|
||||
|
@ -1179,7 +1181,7 @@ class TestFsmEvent(test_base.NodeStateTest):
|
|||
def test_unlock_unlocked(self):
|
||||
@node_cache.release_lock
|
||||
def func(node_info):
|
||||
self.assertFalse(node_info._locked)
|
||||
self.assertFalse(node_info._lock.is_locked())
|
||||
|
||||
self.node_info.release_lock()
|
||||
with mock.patch.object(self.node_info, 'release_lock',
|
||||
|
|
|
@ -79,8 +79,7 @@ class BaseProcessTest(BaseTest):
|
|||
self.addCleanup(self._cleanup_lock, self.node_info)
|
||||
|
||||
def _cleanup_lock(self, node_info):
|
||||
if node_info._locked:
|
||||
node_info.release_lock()
|
||||
node_info.release_lock()
|
||||
|
||||
|
||||
class TestProcess(BaseProcessTest):
|
||||
|
|
Loading…
Reference in New Issue