Merge "Simplify locking code"
This commit is contained in:
commit
fe8f909619
|
@ -133,7 +133,7 @@ def abort(node_id, token=None):
|
|||
"""
|
||||
LOG.debug('Aborting introspection for node %s', node_id)
|
||||
ironic = ir_utils.get_client(token)
|
||||
node_info = node_cache.get_node(node_id, ironic=ironic, locked=False)
|
||||
node_info = node_cache.get_node(node_id, ironic=ironic)
|
||||
|
||||
# check pending operations
|
||||
locked = node_info.acquire_lock(blocking=False)
|
||||
|
|
|
@ -56,11 +56,6 @@ def _get_lock(uuid):
|
|||
semaphores=_SEMAPHORES)
|
||||
|
||||
|
||||
def _get_lock_ctx(uuid):
|
||||
"""Get context manager yielding a lock object for a given node UUID."""
|
||||
return lockutils.lock(_LOCK_TEMPLATE % uuid, semaphores=_SEMAPHORES)
|
||||
|
||||
|
||||
class NodeInfo(object):
|
||||
"""Record about a node in the cache.
|
||||
|
||||
|
@ -71,7 +66,7 @@ class NodeInfo(object):
|
|||
|
||||
def __init__(self, uuid, version_id=None, state=None, started_at=None,
|
||||
finished_at=None, error=None, node=None, ports=None,
|
||||
ironic=None, lock=None, manage_boot=True):
|
||||
ironic=None, manage_boot=True):
|
||||
self.uuid = uuid
|
||||
self.started_at = started_at
|
||||
self.finished_at = finished_at
|
||||
|
@ -89,9 +84,9 @@ class NodeInfo(object):
|
|||
# equivalent to True actually.
|
||||
self._manage_boot = manage_boot if manage_boot is not None else True
|
||||
# This is a lock on a node UUID, not on a NodeInfo object
|
||||
self._lock = lock if lock is not None else _get_lock(uuid)
|
||||
self._lock = _get_lock(uuid)
|
||||
# Whether lock was acquired using this NodeInfo object
|
||||
self._locked = lock is not None
|
||||
self._locked = False
|
||||
self._fsm = None
|
||||
|
||||
def __del__(self):
|
||||
|
@ -319,12 +314,12 @@ class NodeInfo(object):
|
|||
self._attributes = None
|
||||
|
||||
@classmethod
|
||||
def from_row(cls, row, ironic=None, lock=None, node=None):
|
||||
def from_row(cls, row, ironic=None, node=None):
|
||||
"""Construct NodeInfo from a database row."""
|
||||
fields = {key: row[key]
|
||||
for key in ('uuid', 'version_id', 'state', 'started_at',
|
||||
'finished_at', 'error', 'manage_boot')}
|
||||
return cls(ironic=ironic, lock=lock, node=node, **fields)
|
||||
return cls(ironic=ironic, node=node, **fields)
|
||||
|
||||
def invalidate_cache(self):
|
||||
"""Clear all cached info, so that it's reloaded next time."""
|
||||
|
@ -743,7 +738,7 @@ def delete_nodes_not_in_list(uuids):
|
|||
for uuid in inspector_uuids - uuids:
|
||||
LOG.warning('Node %s was deleted from Ironic, dropping from Ironic '
|
||||
'Inspector database', uuid)
|
||||
with _get_lock_ctx(uuid):
|
||||
with _get_lock(uuid):
|
||||
_delete_node(uuid)
|
||||
|
||||
|
||||
|
@ -783,12 +778,11 @@ def _list_node_uuids():
|
|||
return {x.uuid for x in db.model_query(db.Node.uuid)}
|
||||
|
||||
|
||||
def get_node(node_id, ironic=None, locked=False):
|
||||
def get_node(node_id, ironic=None):
|
||||
"""Get node from cache.
|
||||
|
||||
:param node_id: node UUID or name.
|
||||
:param ironic: optional ironic client instance
|
||||
:param locked: if True, get a lock on node before fetching its data
|
||||
:returns: structure NodeInfo.
|
||||
"""
|
||||
if uuidutils.is_uuid_like(node_id):
|
||||
|
@ -798,22 +792,11 @@ def get_node(node_id, ironic=None, locked=False):
|
|||
node = ir_utils.get_node(node_id, ironic=ironic)
|
||||
uuid = node.uuid
|
||||
|
||||
if locked:
|
||||
lock = _get_lock(uuid)
|
||||
lock.acquire()
|
||||
else:
|
||||
lock = None
|
||||
|
||||
try:
|
||||
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
||||
if row is None:
|
||||
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
||||
code=404)
|
||||
return NodeInfo.from_row(row, ironic=ironic, lock=lock, node=node)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
if lock is not None:
|
||||
lock.release()
|
||||
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
||||
if row is None:
|
||||
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
||||
code=404)
|
||||
return NodeInfo.from_row(row, ironic=ironic, node=node)
|
||||
|
||||
|
||||
def find_node(**attributes):
|
||||
|
@ -911,22 +894,28 @@ def clean_up():
|
|||
return []
|
||||
|
||||
LOG.error('Introspection for nodes %s has timed out', uuids)
|
||||
locked_uuids = []
|
||||
for u in uuids:
|
||||
node_info = get_node(u, locked=True)
|
||||
try:
|
||||
if node_info.finished_at or node_info.started_at > threshold:
|
||||
continue
|
||||
if node_info.state != istate.States.waiting:
|
||||
LOG.error('Something went wrong, timeout occurred '
|
||||
'while introspection in "%s" state',
|
||||
node_info.state,
|
||||
node_info=node_info)
|
||||
node_info.finished(
|
||||
istate.Events.timeout, error='Introspection timeout')
|
||||
finally:
|
||||
node_info.release_lock()
|
||||
node_info = get_node(u)
|
||||
if node_info.acquire_lock(blocking=False):
|
||||
try:
|
||||
if node_info.finished_at or node_info.started_at > threshold:
|
||||
continue
|
||||
if node_info.state != istate.States.waiting:
|
||||
LOG.error('Something went wrong, timeout occurred '
|
||||
'while introspection in "%s" state',
|
||||
node_info.state,
|
||||
node_info=node_info)
|
||||
node_info.finished(
|
||||
istate.Events.timeout, error='Introspection timeout')
|
||||
locked_uuids.append(u)
|
||||
finally:
|
||||
node_info.release_lock()
|
||||
else:
|
||||
LOG.info('Failed to acquire lock when updating node state',
|
||||
node_info=node_info)
|
||||
|
||||
return uuids
|
||||
return locked_uuids
|
||||
|
||||
|
||||
def create_node(driver, ironic=None, **attributes):
|
||||
|
|
|
@ -315,7 +315,7 @@ def reapply(node_uuid, data=None):
|
|||
|
||||
LOG.debug('Processing re-apply introspection request for node '
|
||||
'UUID: %s', node_uuid)
|
||||
node_info = node_cache.get_node(node_uuid, locked=False)
|
||||
node_info = node_cache.get_node(node_uuid)
|
||||
if not node_info.acquire_lock(blocking=False):
|
||||
# Note (mkovacik): it should be sufficient to check data
|
||||
# presence & locking. If either introspection didn't start
|
||||
|
|
|
@ -414,8 +414,7 @@ class TestAbort(BaseTest):
|
|||
|
||||
introspect.abort(self.node.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
|
@ -433,8 +432,7 @@ class TestAbort(BaseTest):
|
|||
|
||||
introspect.abort(self.node.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
self.assertFalse(cli.node.set_power_state.called)
|
||||
|
@ -479,8 +477,7 @@ class TestAbort(BaseTest):
|
|||
|
||||
introspect.abort(self.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
|
@ -498,8 +495,7 @@ class TestAbort(BaseTest):
|
|||
|
||||
introspect.abort(self.uuid)
|
||||
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli,
|
||||
locked=False)
|
||||
get_mock.assert_called_once_with(self.uuid, ironic=cli)
|
||||
self.node_info.acquire_lock.assert_called_once_with(blocking=False)
|
||||
self.sync_filter_mock.assert_called_once_with(cli)
|
||||
cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||
|
|
|
@ -102,7 +102,7 @@ class TestNodeCache(test_base.NodeTest):
|
|||
uuid=self.uuid).first()
|
||||
self.assertIsNone(row_option)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock_ctx', autospec=True)
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(node_cache, '_list_node_uuids')
|
||||
@mock.patch.object(node_cache, '_delete_node')
|
||||
def test_delete_nodes_not_in_list(self, mock__delete_node,
|
||||
|
@ -368,9 +368,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||
self.assertEqual(1, db.model_query(db.IntrospectionData).count())
|
||||
self.assertFalse(get_lock_mock.called)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(node_cache.NodeInfo, 'acquire_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_timeout(self, time_mock, get_lock_mock):
|
||||
def test_timeout(self, time_mock, lock_mock):
|
||||
# Add a finished node to confirm we don't try to timeout it
|
||||
time_mock.return_value = self.started_at
|
||||
session = db.get_writer_session()
|
||||
|
@ -396,8 +396,7 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||
res)
|
||||
self.assertEqual([], db.model_query(db.Attribute).all())
|
||||
self.assertEqual([], db.model_query(db.Option).all())
|
||||
get_lock_mock.assert_called_once_with(self.uuid)
|
||||
get_lock_mock.return_value.acquire.assert_called_once_with()
|
||||
lock_mock.assert_called_once_with(mock.ANY, blocking=False)
|
||||
|
||||
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
|
@ -421,6 +420,22 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||
[(istate.States.error, current_time, 'Introspection timeout')],
|
||||
res)
|
||||
|
||||
@mock.patch.object(node_cache.NodeInfo, 'acquire_lock', autospec=True)
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_timeout_lock_failed(self, time_mock, get_lock_mock):
|
||||
time_mock.return_value = self.started_at
|
||||
CONF.set_override('timeout', 1)
|
||||
get_lock_mock.return_value = False
|
||||
current_time = self.started_at + datetime.timedelta(seconds=2)
|
||||
time_mock.return_value = current_time
|
||||
|
||||
self.assertEqual([], node_cache.clean_up())
|
||||
|
||||
res = [(row.state, row.finished_at, row.error) for row in
|
||||
db.model_query(db.Node).all()]
|
||||
self.assertEqual([('waiting', None, None)], res)
|
||||
get_lock_mock.assert_called_once_with(mock.ANY, blocking=False)
|
||||
|
||||
|
||||
class TestNodeCacheGetNode(test_base.NodeTest):
|
||||
def test_ok(self):
|
||||
|
@ -439,22 +454,6 @@ class TestNodeCacheGetNode(test_base.NodeTest):
|
|||
self.assertIsNone(info.error)
|
||||
self.assertFalse(info._locked)
|
||||
|
||||
def test_locked(self):
|
||||
started_at = (datetime.datetime.utcnow() -
|
||||
datetime.timedelta(seconds=42))
|
||||
session = db.get_writer_session()
|
||||
with session.begin():
|
||||
db.Node(uuid=self.uuid,
|
||||
state=istate.States.starting,
|
||||
started_at=started_at).save(session)
|
||||
info = node_cache.get_node(self.uuid, locked=True)
|
||||
self.addCleanup(info.release_lock)
|
||||
self.assertEqual(self.uuid, info.uuid)
|
||||
self.assertEqual(started_at, info.started_at)
|
||||
self.assertIsNone(info.finished_at)
|
||||
self.assertIsNone(info.error)
|
||||
self.assertTrue(info._locked)
|
||||
|
||||
def test_not_found(self):
|
||||
self.assertRaises(utils.Error, node_cache.get_node,
|
||||
uuidutils.generate_uuid())
|
||||
|
|
|
@ -546,7 +546,7 @@ class TestReapply(BaseTest):
|
|||
@prepare_mocks
|
||||
def test_ok(self, pop_mock, reapply_mock):
|
||||
process.reapply(self.uuid)
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
|
@ -561,7 +561,7 @@ class TestReapply(BaseTest):
|
|||
'Node locked, please, try again later',
|
||||
process.reapply, self.uuid)
|
||||
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
|
@ -569,7 +569,7 @@ class TestReapply(BaseTest):
|
|||
@prepare_mocks
|
||||
def test_reapply_with_data(self, pop_mock, reapply_mock):
|
||||
process.reapply(self.uuid, data=self.data)
|
||||
pop_mock.assert_called_once_with(self.uuid, locked=False)
|
||||
pop_mock.assert_called_once_with(self.uuid)
|
||||
pop_mock.return_value.acquire_lock.assert_called_once_with(
|
||||
blocking=False
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue