Simplify locking code
NodeInfo accepts a lock instance when instantiated, it is only used in the node clean up periodic task, such implementation introduces complexity on the lock abstraction. This patch moves node locking out of get_node. _get_lock_ctx is removed for it only duplicates _get_lock according to current usage. Change-Id: I4e946942ce684b27062d1a42c68faf1d92569489
This commit is contained in:
parent
b8d1bda4c6
commit
d57179411d
@ -133,7 +133,7 @@ def abort(node_id, token=None):
|
||||
"""
|
||||
LOG.debug('Aborting introspection for node %s', node_id)
|
||||
ironic = ir_utils.get_client(token)
|
||||
node_info = node_cache.get_node(node_id, ironic=ironic, locked=False)
|
||||
node_info = node_cache.get_node(node_id, ironic=ironic)
|
||||
|
||||
# check pending operations
|
||||
locked = node_info.acquire_lock(blocking=False)
|
||||
|
@ -56,11 +56,6 @@ def _get_lock(uuid):
|
||||
semaphores=_SEMAPHORES)
|
||||
|
||||
|
||||
def _get_lock_ctx(uuid):
|
||||
"""Get context manager yielding a lock object for a given node UUID."""
|
||||
return lockutils.lock(_LOCK_TEMPLATE % uuid, semaphores=_SEMAPHORES)
|
||||
|
||||
|
||||
class NodeInfo(object):
|
||||
"""Record about a node in the cache.
|
||||
|
||||
@ -71,7 +66,7 @@ class NodeInfo(object):
|
||||
|
||||
def __init__(self, uuid, version_id=None, state=None, started_at=None,
|
||||
finished_at=None, error=None, node=None, ports=None,
|
||||
ironic=None, lock=None, manage_boot=True):
|
||||
ironic=None, manage_boot=True):
|
||||
self.uuid = uuid
|
||||
self.started_at = started_at
|
||||
self.finished_at = finished_at
|
||||
@ -89,9 +84,9 @@ class NodeInfo(object):
|
||||
# equivalent to True actually.
|
||||
self._manage_boot = manage_boot if manage_boot is not None else True
|
||||
# This is a lock on a node UUID, not on a NodeInfo object
|
||||
self._lock = lock if lock is not None else _get_lock(uuid)
|
||||
self._lock = _get_lock(uuid)
|
||||
# Whether lock was acquired using this NodeInfo object
|
||||
self._locked = lock is not None
|
||||
self._locked = False
|
||||
self._fsm = None
|
||||
|
||||
def __del__(self):
|
||||
@ -319,12 +314,12 @@ class NodeInfo(object):
|
||||
self._attributes = None
|
||||
|
||||
@classmethod
|
||||
def from_row(cls, row, ironic=None, lock=None, node=None):
|
||||
def from_row(cls, row, ironic=None, node=None):
|
||||
"""Construct NodeInfo from a database row."""
|
||||
fields = {key: row[key]
|
||||
for key in ('uuid', 'version_id', 'state', 'started_at',
|
||||
'finished_at', 'error', 'manage_boot')}
|
||||
return cls(ironic=ironic, lock=lock, node=node, **fields)
|
||||
return cls(ironic=ironic, node=node, **fields)
|
||||
|
||||
def invalidate_cache(self):
|
||||
"""Clear all cached info, so that it's reloaded next time."""
|
||||
@ -743,7 +738,7 @@ def delete_nodes_not_in_list(uuids):
|
||||
for uuid in inspector_uuids - uuids:
|
||||
LOG.warning('Node %s was deleted from Ironic, dropping from Ironic '
|
||||
'Inspector database', uuid)
|
||||
with _get_lock_ctx(uuid):
|
||||
with _get_lock(uuid):
|
||||
_delete_node(uuid)
|
||||
|
||||
|
||||
@ -783,12 +778,11 @@ def _list_node_uuids():
|
||||
return {x.uuid for x in db.model_query(db.Node.uuid)}
|
||||
|
||||
|
||||
def get_node(node_id, ironic=None, locked=False):
|
||||
def get_node(node_id, ironic=None):
|
||||
"""Get node from cache.
|
||||
|
||||
:param node_id: node UUID or name.
|
||||
:param ironic: optional ironic client instance
|
||||
:param locked: if True, get a lock on node before fetching its data
|
||||
:returns: structure NodeInfo.
|
||||
"""
|
||||
if uuidutils.is_uuid_like(node_id):
|
||||
@ -798,22 +792,11 @@ def get_node(node_id, ironic=None, locked=False):
|
||||
node = ir_utils.get_node(node_id, ironic=ironic)
|
||||
uuid = node.uuid
|
||||
|
||||
if locked:
|
||||
lock = _get_lock(uuid)
|
||||
lock.acquire()
|
||||
else:
|
||||
lock = None
|
||||
|
||||
try:
|
||||
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
||||
if row is None:
|
||||
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
||||
code=404)
|
||||
return NodeInfo.from_row(row, ironic=ironic, lock=lock, node=node)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
if lock is not None:
|
||||
lock.release()
|
||||
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
||||
if row is None:
|
||||
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
||||
code=404)
|
||||
return NodeInfo.from_row(row, ironic=ironic, node=node)
|
||||
|
||||
|
||||
def find_node(**attributes):
|
||||
@ -914,22 +897,28 @@ def clean_up():
|
||||
return []
|
||||
|
||||
LOG.error('Introspection for nodes %s has timed out', uuids)
|
||||
locked_uuids = []
|
||||
for u in uuids:
|
||||
node_info = get_node(u, locked=True)
|
||||
try:
|
||||
if node_info.finished_at or node_info.started_at > threshold:
|
||||
continue
|
||||
if node_info.state != istate.States.waiting:
|
||||
LOG.error('Something went wrong, timeout occurred '
|
||||
'while introspection in "%s" state',
|
||||
node_info.state,
|
||||
node_info=node_info)
|
||||
node_info.finished(
|
||||
istate.Events.timeout, error='Introspection timeout')
|
||||
finally:
|
||||
node_info.release_lock()
|
||||
node_info = get_node(u)
|
||||
if node_info.acquire_lock(blocking=False):
|
||||
try:
|
||||
if node_info.finished_at or node_info.started_at > threshold:
|
||||
continue
|
||||
if node_info.state != istate.States.waiting:
|
||||
LOG.error('Something went wrong, timeout occurred '
|
||||
'while introspection in "%s" state',
|
||||
node_info.state,
|
||||
node_info=node_info)
|
||||
node_info.finished(
|
||||
istate.Events.timeout, error='Introspection timeout')
|
||||
locked_uuids.append(u)
|
||||
finally:
|
||||
node_info.release_lock()
|
||||
else:
|
||||
LOG.info('Failed to acquire lock when updating node state',
|
||||
node_info=node_info)
|
||||
|
||||
return uuids
|
||||
return locked_uuids
|
||||
|
||||
|
||||
def create_node(driver, ironic=None, **attributes):
|
||||
|
@ -315,7 +315,7 @@ def reapply(node_uuid, data=None):
|
||||
|
||||
LOG.debug('Processing re-apply introspection request for node '
|
||||
'UUID: %s', node_uuid)
|
||||
node_info = node_cache.get_node(node_uuid, locked=False)
|
||||
node_info = node_cache.get_node(node_uuid)
|
||||
if not node_info.acquire_lock(blocking=False):
|
||||
# Note (mkovacik): it should be sufficient to check data
|
||||
# presence & locking. If either introspection didn't start
|
||||
|
@ -414,8 +414,7 @@ class TestAbort(BaseTest):
|
||||
|
||||
introspect.abort(self.node.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
@ -433,8 +432,7 @@ class TestAbort(BaseTest):
|
||||
|
||||
introspect.abort(self.node.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
self.assertFalse(cli.node.set_power_state.called)
|
||||
@ -479,8 +477,7 @@ class TestAbort(BaseTest):
|
||||
|
||||
introspect.abort(self.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
@ -498,8 +495,7 @@ class TestAbort(BaseTest):
|
||||
|
||||
introspect.abort(self.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
|
@ -102,7 +102,7 @@ class TestNodeCache(test_base.NodeTest):
|
||||
uuid=self.uuid).first()
|
||||
self.assertIsNone(row_option)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock_ctx', autospec=True)
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(node_cache, '_list_node_uuids')
|
||||
@mock.patch.object(node_cache, '_delete_node')
|
||||
def test_delete_nodes_not_in_list(self, mock__delete_node,
|
||||
@ -363,9 +363,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
||||
self.assertEqual(1, db.model_query(db.IntrospectionData).count())
|
||||
self.assertFalse(get_lock_mock.called)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(node_cache.NodeInfo, 'acquire_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_timeout(self, time_mock, get_lock_mock):
|
||||
def test_timeout(self, time_mock, lock_mock):
|
||||
# Add a finished node to confirm we don't try to timeout it
|
||||
time_mock.return_value = self.started_at
|
||||
session = db.get_writer_session()
|
||||
@ -391,8 +391,7 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
||||
res)
|
||||
self.assertEqual([], db.model_query(db.Attribute).all())
|
||||
self.assertEqual([], db.model_query(db.Option).all())
|
||||
get_lock_mock.assert_called_once_with(self.uuid)
|
||||
get_lock_mock.return_value.acquire.assert_called_once_with()
|
||||
lock_mock.assert_called_once_with(mock.ANY, blocking=False)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
@ -416,6 +415,22 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
||||
[(istate.States.error, current_time, 'Introspection timeout')],
|
||||
res)
|
||||
|
||||
@mock.patch.object(node_cache.NodeInfo, 'acquire_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_timeout_lock_failed(self, time_mock, get_lock_mock):
|
||||
time_mock.return_value = self.started_at
|
||||
CONF.set_override('timeout', 1)
|
||||
get_lock_mock.return_value = False
|
||||
current_time = self.started_at + datetime.timedelta(seconds=2)
|
||||
time_mock.return_value = current_time
|
||||
|
||||
self.assertEqual([], node_cache.clean_up())
|
||||
|
||||
res = [(row.state, row.finished_at, row.error) for row in
|
||||
db.model_query(db.Node).all()]
|
||||
self.assertEqual([('waiting', None, None)], res)
|
||||
get_lock_mock.assert_called_once_with(mock.ANY, blocking=False)
|
||||
|
||||
|
||||
class TestNodeCacheGetNode(test_base.NodeTest):
|
||||
def test_ok(self):
|
||||
@ -434,22 +449,6 @@ class TestNodeCacheGetNode(test_base.NodeTest):
|
||||
self.assertIsNone(info.error)
|
||||
self.assertFalse(info._locked)
|
||||
|
||||
def test_locked(self):
|
||||
started_at = (datetime.datetime.utcnow() -
|
||||
datetime.timedelta(seconds=42))
|
||||
session = db.get_writer_session()
|
||||
with session.begin():
|
||||
db.Node(uuid=self.uuid,
|
||||
state=istate.States.starting,
|
||||
started_at=started_at).save(session)
|
||||
info = node_cache.get_node(self.uuid, locked=True)
|
||||
self.addCleanup(info.release_lock)
|
||||
self.assertEqual(self.uuid, info.uuid)
|
||||
self.assertEqual(started_at, info.started_at)
|
||||
self.assertIsNone(info.finished_at)
|
||||
self.assertIsNone(info.error)
|
||||
self.assertTrue(info._locked)
|
||||
|
||||
def test_not_found(self):
|
||||
self.assertRaises(utils.Error, node_cache.get_node,
|
||||
uuidutils.generate_uuid())
|
||||
|
@ -564,7 +564,7 @@ class TestReapply(BaseTest):
|
||||
@prepare_mocks
|
||||
def test_ok(self, pop_mock, reapply_mock):
|
||||
process.reapply(self.uuid)
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
@ -579,7 +579,7 @@ class TestReapply(BaseTest):
|
||||
'Node locked, please, try again later',
|
||||
process.reapply, self.uuid)
|
||||
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
@ -587,7 +587,7 @@ class TestReapply(BaseTest):
|
||||
@prepare_mocks
|
||||
def test_reapply_with_data(self, pop_mock, reapply_mock):
|
||||
process.reapply(self.uuid, data=self.data)
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user