diff --git a/ironic_inspector/introspect.py b/ironic_inspector/introspect.py index 981eda337..37edb5f89 100644 --- a/ironic_inspector/introspect.py +++ b/ironic_inspector/introspect.py @@ -117,6 +117,26 @@ def introspect(uuid, new_ipmi_credentials=None, token=None): def _background_introspect(ironic, node_info): global _LAST_INTROSPECTION_TIME + if not node_info.options.get('new_ipmi_credentials'): + if re.match(CONF.introspection_delay_drivers, node_info.node().driver): + LOG.debug('Attempting to acquire lock on last introspection time') + with _LAST_INTROSPECTION_LOCK: + delay = (_LAST_INTROSPECTION_TIME - time.time() + + CONF.introspection_delay) + if delay > 0: + LOG.debug('Waiting %d seconds before sending the next ' + 'node on introspection', delay) + time.sleep(delay) + _LAST_INTROSPECTION_TIME = time.time() + + node_info.acquire_lock() + try: + _background_introspect_locked(ironic, node_info) + finally: + node_info.release_lock() + + +def _background_introspect_locked(ironic, node_info): # TODO(dtantsur): pagination macs = list(node_info.ports()) if macs: @@ -146,17 +166,6 @@ def _background_introspect(ironic, node_info): ' node %(node)s: %(exc)s') % {'node': node_info.uuid, 'exc': exc}) - if re.match(CONF.introspection_delay_drivers, node_info.node().driver): - LOG.debug('Attempting to acquire lock on last introspection time') - with _LAST_INTROSPECTION_LOCK: - delay = (_LAST_INTROSPECTION_TIME - time.time() - + CONF.introspection_delay) - if delay > 0: - LOG.debug('Waiting %d seconds before sending the next ' - 'node on introspection', delay) - time.sleep(delay) - _LAST_INTROSPECTION_TIME = time.time() - try: ironic.node.set_power_state(node_info.uuid, 'reboot') except Exception as exc: diff --git a/ironic_inspector/node_cache.py b/ironic_inspector/node_cache.py index 772be148a..63ed59c05 100644 --- a/ironic_inspector/node_cache.py +++ b/ironic_inspector/node_cache.py @@ -18,9 +18,11 @@ import json import time from ironicclient import exceptions +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log +from oslo_utils import excutils from sqlalchemy import text from ironic_inspector import db @@ -34,13 +36,31 @@ LOG = log.getLogger("ironic_inspector.node_cache") MACS_ATTRIBUTE = 'mac' +_LOCK_TEMPLATE = 'node-%s' +_SEMAPHORES = lockutils.Semaphores() + + +def _get_lock(uuid): + """Get lock object for a given node UUID.""" + return lockutils.internal_lock(_LOCK_TEMPLATE % uuid, + semaphores=_SEMAPHORES) + + +def _get_lock_ctx(uuid): + """Get context manager yielding a lock object for a given node UUID.""" + return lockutils.lock(_LOCK_TEMPLATE % uuid, semaphores=_SEMAPHORES) class NodeInfo(object): - """Record about a node in the cache.""" + """Record about a node in the cache. - def __init__(self, uuid, started_at, finished_at=None, error=None, - node=None, ports=None, ironic=None): + This class optionally allows to acquire a lock on a node. Note that the + class instance itself is NOT thread-safe, you need to create a new instance + for every thread. + """ + + def __init__(self, uuid, started_at=None, finished_at=None, error=None, + node=None, ports=None, ironic=None, lock=None): self.uuid = uuid self.started_at = started_at self.finished_at = finished_at @@ -52,6 +72,48 @@ class NodeInfo(object): self._ports = ports self._attributes = None self._ironic = ironic + # This is a lock on a node UUID, not on a NodeInfo object + self._lock = lock if lock is not None else _get_lock(uuid) + # Whether lock was acquired using this NodeInfo object + self._locked = lock is not None + + def __del__(self): + if self._locked: + LOG.warning(_LW('BUG: node lock was not released by the moment ' + 'node info object is deleted')) + self._lock.release() + + def acquire_lock(self, blocking=True): + """Acquire a lock on the associated node. + + Exits with success if a lock is already acquired using this NodeInfo + object. + + :param blocking: if True, wait for lock to be acquired, otherwise + return immediately. + :returns: boolean value, whether lock was acquired successfully + """ + if self._locked: + return True + + LOG.debug('Attempting to acquire lock on node %s', self.uuid) + if self._lock.acquire(blocking): + self._locked = True + LOG.debug('Successfully acquired lock on node %s', self.uuid) + return True + else: + LOG.debug('Unable to acquire lock on node %s', self.uuid) + return False + + def release_lock(self): + """Release a lock on a node. + + Does nothing if lock was not acquired using this NodeInfo object. + """ + if self._locked: + LOG.debug('Successfully released lock on node %s', self.uuid) + self._lock.release() + self._locked = False @property def options(self): @@ -98,6 +160,8 @@ class NodeInfo(object): :param error: error message """ + self.release_lock() + self.finished_at = time.time() self.error = error @@ -136,11 +200,11 @@ class NodeInfo(object): self._attributes = None @classmethod - def from_row(cls, row, ironic=None): + def from_row(cls, row, ironic=None, lock=None): """Construct NodeInfo from a database row.""" fields = {key: row[key] for key in ('uuid', 'started_at', 'finished_at', 'error')} - return cls(ironic=ironic, **fields) + return cls(ironic=ironic, lock=lock, **fields) def invalidate_cache(self): """Clear all cached info, so that it's reloaded next time.""" @@ -333,7 +397,8 @@ def delete_nodes_not_in_list(uuids): LOG.warning( _LW('Node %s was deleted from Ironic, dropping from Ironic ' 'Inspector database'), uuid) - _delete_node(uuid) + with _get_lock_ctx(uuid): + _delete_node(uuid) def _delete_node(uuid, session=None): @@ -362,23 +427,37 @@ def _list_node_uuids(): return {x.uuid for x in db.model_query(db.Node.uuid)} -def get_node(uuid, ironic=None): +def get_node(uuid, ironic=None, locked=False): """Get node from cache by it's UUID. :param uuid: node UUID. :param ironic: optional ironic client instance + :param locked: if True, get a lock on node before fetching its data :returns: structure NodeInfo. """ - row = db.model_query(db.Node).filter_by(uuid=uuid).first() - if row is None: - raise utils.Error(_('Could not find node %s in cache') % uuid, - code=404) - return NodeInfo.from_row(row, ironic=ironic) + if locked: + lock = _get_lock(uuid) + lock.acquire() + else: + lock = None + + try: + row = db.model_query(db.Node).filter_by(uuid=uuid).first() + if row is None: + raise utils.Error(_('Could not find node %s in cache') % uuid, + code=404) + return NodeInfo.from_row(row, ironic=ironic, lock=lock) + except Exception: + with excutils.save_and_reraise_exception(): + if lock is not None: + lock.release() def find_node(**attributes): """Find node in cache. + This function acquires a lock on a node. + :param attributes: attributes known about this node (like macs, BMC etc) also ironic client instance may be passed under 'ironic' :returns: structure NodeInfo with attributes ``uuid`` and ``created_at`` @@ -417,20 +496,28 @@ def find_node(**attributes): % {'attr': attributes, 'found': list(found)}, code=404) uuid = found.pop() - row = (db.model_query(db.Node.started_at, db.Node.finished_at). - filter_by(uuid=uuid).first()) + node_info = NodeInfo(uuid=uuid, ironic=ironic) + node_info.acquire_lock() - if not row: - raise utils.Error(_( - 'Could not find node %s in introspection cache, ' - 'probably it\'s not on introspection now') % uuid, code=404) + try: + row = (db.model_query(db.Node.started_at, db.Node.finished_at). + filter_by(uuid=uuid).first()) - if row.finished_at: - raise utils.Error(_( - 'Introspection for node %(node)s already finished on ' - '%(finish)s') % {'node': uuid, 'finish': row.finished_at}) + if not row: + raise utils.Error(_( + 'Could not find node %s in introspection cache, ' + 'probably it\'s not on introspection now') % uuid, code=404) - return NodeInfo(uuid=uuid, started_at=row.started_at, ironic=ironic) + if row.finished_at: + raise utils.Error(_( + 'Introspection for node %(node)s already finished on ' + '%(finish)s') % {'node': uuid, 'finish': row.finished_at}) + + node_info.started_at = row.started_at + return node_info + except Exception: + with excutils.save_and_reraise_exception(): + node_info.release_lock() def clean_up(): @@ -461,15 +548,20 @@ def clean_up(): return [] LOG.error(_LE('Introspection for nodes %s has timed out'), uuids) - query = db.model_query(db.Node, session=session).filter( - db.Node.started_at < threshold, - db.Node.finished_at.is_(None)) - query.update({'finished_at': time.time(), - 'error': 'Introspection timeout'}) for u in uuids: - db.model_query(db.Attribute, session=session).filter_by( - uuid=u).delete() - db.model_query(db.Option, session=session).filter_by( - uuid=u).delete() + node_info = get_node(u, locked=True) + try: + if node_info.finished_at or node_info.started_at > threshold: + continue + + db.model_query(db.Node, session=session).filter_by( + uuid=u).update({'finished_at': time.time(), + 'error': 'Introspection timeout'}) + db.model_query(db.Attribute, session=session).filter_by( + uuid=u).delete() + db.model_query(db.Option, session=session).filter_by( + uuid=u).delete() + finally: + node_info.release_lock() return uuids diff --git a/ironic_inspector/process.py b/ironic_inspector/process.py index 97527d68b..f34affb7b 100644 --- a/ironic_inspector/process.py +++ b/ironic_inspector/process.py @@ -85,6 +85,10 @@ def process(introspection_data): 'in hook %s') % hook_ext.name) node_info = _find_node_info(introspection_data, failures) + if node_info: + # Locking is already done in find_node() but may be not done in a + # node_not_found hook + node_info.acquire_lock() if failures and node_info: msg = _('The following failures happened during running ' diff --git a/ironic_inspector/test/base.py b/ironic_inspector/test/base.py index bce3798aa..88bd546c1 100644 --- a/ironic_inspector/test/base.py +++ b/ironic_inspector/test/base.py @@ -14,6 +14,7 @@ import unittest import mock +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log @@ -57,6 +58,7 @@ class BaseTest(unittest.TestCase): engine.connect() self.addCleanup(db.get_engine().dispose) plugins_base._HOOKS_MGR = None + node_cache._SEMAPHORES = lockutils.Semaphores() for name in ('_', '_LI', '_LW', '_LE', '_LC'): patch = mock.patch.object(i18n, name, lambda s: s) patch.start() diff --git a/ironic_inspector/test/test_introspect.py b/ironic_inspector/test/test_introspect.py index f2bbd509c..5b9f5a198 100644 --- a/ironic_inspector/test/test_introspect.py +++ b/ironic_inspector/test/test_introspect.py @@ -82,8 +82,10 @@ class TestIntrospect(BaseTest): persistent=False) cli.node.set_power_state.assert_called_once_with(self.uuid, 'reboot') - add_mock.return_value.set_option.assert_called_once_with( + self.node_info.set_option.assert_called_once_with( 'new_ipmi_credentials', None) + self.node_info.acquire_lock.assert_called_once_with() + self.node_info.release_lock.assert_called_once_with() def test_ok_ilo_and_drac(self, client_mock, add_mock, filters_mock): cli = self._prepare(client_mock) @@ -117,6 +119,8 @@ class TestIntrospect(BaseTest): 'reboot') add_mock.return_value.finished.assert_called_once_with( error=mock.ANY) + self.node_info.acquire_lock.assert_called_once_with() + self.node_info.release_lock.assert_called_once_with() def test_unexpected_error(self, client_mock, add_mock, filters_mock): cli = self._prepare(client_mock) @@ -133,6 +137,8 @@ class TestIntrospect(BaseTest): self.assertFalse(cli.node.set_boot_device.called) add_mock.return_value.finished.assert_called_once_with( error=mock.ANY) + self.node_info.acquire_lock.assert_called_once_with() + self.node_info.release_lock.assert_called_once_with() def test_with_maintenance(self, client_mock, add_mock, filters_mock): cli = client_mock.return_value @@ -194,6 +200,8 @@ class TestIntrospect(BaseTest): self.node_info.finished.assert_called_once_with(error=mock.ANY) self.assertEqual(0, filters_mock.call_count) self.assertEqual(0, cli.node.set_power_state.call_count) + self.node_info.acquire_lock.assert_called_once_with() + self.node_info.release_lock.assert_called_once_with() def test_no_lookup_attrs_with_node_not_found_hook(self, client_mock, add_mock, filters_mock): @@ -229,6 +237,7 @@ class TestIntrospect(BaseTest): self.assertEqual(0, filters_mock.call_count) self.assertEqual(0, cli.node.set_power_state.call_count) self.assertFalse(add_mock.called) + self.assertFalse(self.node_info.acquire_lock.called) def test_failed_to_validate_node(self, client_mock, add_mock, filters_mock): @@ -247,6 +256,7 @@ class TestIntrospect(BaseTest): self.assertEqual(0, filters_mock.call_count) self.assertEqual(0, cli.node.set_power_state.call_count) self.assertFalse(add_mock.called) + self.assertFalse(self.node_info.acquire_lock.called) def test_wrong_provision_state(self, client_mock, add_mock, filters_mock): self.node.provision_state = 'active' @@ -261,6 +271,7 @@ class TestIntrospect(BaseTest): self.assertEqual(0, filters_mock.call_count) self.assertEqual(0, cli.node.set_power_state.call_count) self.assertFalse(add_mock.called) + self.assertFalse(self.node_info.acquire_lock.called) @mock.patch.object(time, 'sleep') @mock.patch.object(time, 'time') diff --git a/ironic_inspector/test/test_node_cache.py b/ironic_inspector/test/test_node_cache.py index aac90cdca..a84f20c4e 100644 --- a/ironic_inspector/test/test_node_cache.py +++ b/ironic_inspector/test/test_node_cache.py @@ -41,6 +41,7 @@ class TestNodeCache(test_base.NodeTest): bmc_address='1.2.3.4', foo=None) self.assertEqual(self.uuid, res.uuid) self.assertTrue(time.time() - 60 < res.started_at < time.time() + 60) + self.assertFalse(res._locked) res = (db.model_query(db.Node.uuid, db.Node.started_at).order_by(db.Node.uuid).all()) @@ -80,10 +81,12 @@ class TestNodeCache(test_base.NodeTest): uuid=self.uuid).first() self.assertIsNone(row_option) + @mock.patch.object(node_cache, '_get_lock_ctx', autospec=True) @mock.patch.object(node_cache, '_list_node_uuids') @mock.patch.object(node_cache, '_delete_node') def test_delete_nodes_not_in_list(self, mock__delete_node, - mock__list_node_uuids): + mock__list_node_uuids, + mock__get_lock_ctx): uuid2 = 'uuid2' uuids = {self.uuid} mock__list_node_uuids.return_value = {self.uuid, uuid2} @@ -91,6 +94,8 @@ class TestNodeCache(test_base.NodeTest): with session.begin(): node_cache.delete_nodes_not_in_list(uuids) mock__delete_node.assert_called_once_with(uuid2) + mock__get_lock_ctx.assert_called_once_with(uuid2) + mock__get_lock_ctx.return_value.__enter__.assert_called_once_with() def test_add_node_duplicate_mac(self): session = db.get_session() @@ -178,11 +183,13 @@ class TestNodeCacheFind(test_base.NodeTest): res = node_cache.find_node(bmc_address='1.2.3.4') self.assertEqual(self.uuid, res.uuid) self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1) + self.assertTrue(res._locked) def test_macs(self): res = node_cache.find_node(mac=['11:22:33:33:33:33', self.macs[1]]) self.assertEqual(self.uuid, res.uuid) self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1) + self.assertTrue(res._locked) def test_macs_not_found(self): self.assertRaises(utils.Error, node_cache.find_node, @@ -199,6 +206,7 @@ class TestNodeCacheFind(test_base.NodeTest): mac=self.macs) self.assertEqual(self.uuid, res.uuid) self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1) + self.assertTrue(res._locked) def test_inconsistency(self): session = db.get_session() @@ -244,8 +252,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest): db.model_query(db.Attribute).count()) self.assertEqual(1, db.model_query(db.Option).count()) + @mock.patch.object(node_cache, '_get_lock', autospec=True) @mock.patch.object(time, 'time') - def test_ok(self, time_mock): + def test_ok(self, time_mock, get_lock_mock): time_mock.return_value = 1000 self.assertFalse(node_cache.clean_up()) @@ -256,9 +265,11 @@ class TestNodeCacheCleanUp(test_base.NodeTest): self.assertEqual(len(self.macs), db.model_query(db.Attribute).count()) self.assertEqual(1, db.model_query(db.Option).count()) + self.assertFalse(get_lock_mock.called) + @mock.patch.object(node_cache, '_get_lock', autospec=True) @mock.patch.object(time, 'time') - def test_timeout(self, time_mock): + def test_timeout(self, time_mock, get_lock_mock): # Add a finished node to confirm we don't try to timeout it time_mock.return_value = self.started_at session = db.get_session() @@ -277,6 +288,8 @@ class TestNodeCacheCleanUp(test_base.NodeTest): res) self.assertEqual([], db.model_query(db.Attribute).all()) self.assertEqual([], db.model_query(db.Option).all()) + get_lock_mock.assert_called_once_with(self.uuid) + get_lock_mock.return_value.acquire.assert_called_once_with() def test_old_status(self): CONF.set_override('node_status_keep_time', 42) @@ -302,6 +315,20 @@ class TestNodeCacheGetNode(test_base.NodeTest): self.assertEqual(started_at, info.started_at) self.assertIsNone(info.finished_at) self.assertIsNone(info.error) + self.assertFalse(info._locked) + + def test_locked(self): + started_at = time.time() - 42 + session = db.get_session() + with session.begin(): + db.Node(uuid=self.uuid, started_at=started_at).save(session) + info = node_cache.get_node(self.uuid, locked=True) + + self.assertEqual(self.uuid, info.uuid) + self.assertEqual(started_at, info.started_at) + self.assertIsNone(info.finished_at) + self.assertIsNone(info.error) + self.assertTrue(info._locked) def test_not_found(self): self.assertRaises(utils.Error, node_cache.get_node, 'foo') @@ -343,6 +370,11 @@ class TestNodeInfoFinished(test_base.NodeTest): self.assertEqual([], db.model_query(db.Attribute).all()) self.assertEqual([], db.model_query(db.Option).all()) + def test_release_lock(self): + self.node_info.acquire_lock() + self.node_info.finished() + self.assertFalse(self.node_info._locked) + class TestInit(unittest.TestCase): def setUp(self): @@ -574,3 +606,43 @@ class TestNodeCacheGetByPath(test_base.NodeTest): self.assertEqual(42, self.node_info.get_by_path('/properties/answer')) self.assertRaises(KeyError, self.node_info.get_by_path, '/foo') self.assertRaises(KeyError, self.node_info.get_by_path, '/extra/foo') + + +@mock.patch.object(node_cache, '_get_lock', autospec=True) +class TestLock(test_base.NodeTest): + def test_acquire(self, get_lock_mock): + node_info = node_cache.NodeInfo(self.uuid) + self.assertFalse(node_info._locked) + get_lock_mock.assert_called_once_with(self.uuid) + self.assertFalse(get_lock_mock.return_value.acquire.called) + + self.assertTrue(node_info.acquire_lock()) + self.assertTrue(node_info._locked) + self.assertTrue(node_info.acquire_lock()) + self.assertTrue(node_info._locked) + get_lock_mock.return_value.acquire.assert_called_once_with(True) + + def test_release(self, get_lock_mock): + node_info = node_cache.NodeInfo(self.uuid) + node_info.acquire_lock() + self.assertTrue(node_info._locked) + node_info.release_lock() + self.assertFalse(node_info._locked) + node_info.release_lock() + self.assertFalse(node_info._locked) + get_lock_mock.return_value.acquire.assert_called_once_with(True) + get_lock_mock.return_value.release.assert_called_once_with() + + def test_acquire_non_blocking(self, get_lock_mock): + node_info = node_cache.NodeInfo(self.uuid) + self.assertFalse(node_info._locked) + get_lock_mock.return_value.acquire.side_effect = iter([False, True]) + + self.assertFalse(node_info.acquire_lock(blocking=False)) + self.assertFalse(node_info._locked) + self.assertTrue(node_info.acquire_lock(blocking=False)) + self.assertTrue(node_info._locked) + self.assertTrue(node_info.acquire_lock(blocking=False)) + self.assertTrue(node_info._locked) + get_lock_mock.return_value.acquire.assert_called_with(False) + self.assertEqual(2, get_lock_mock.return_value.acquire.call_count) diff --git a/releasenotes/notes/node-locking-4d135ca5b93524b1.yaml b/releasenotes/notes/node-locking-4d135ca5b93524b1.yaml new file mode 100644 index 000000000..77896d9e5 --- /dev/null +++ b/releasenotes/notes/node-locking-4d135ca5b93524b1.yaml @@ -0,0 +1,3 @@ +--- +fixes: + - Acquire a lock on a node UUID when handling it. diff --git a/requirements.txt b/requirements.txt index a9c7f229f..9d51e0362 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ pbr>=1.6 python-ironicclient>=0.8.0 python-keystoneclient!=1.8.0,>=1.6.0 python-swiftclient>=2.2.0 +oslo.concurrency>=2.3.0 # Apache-2.0 oslo.config>=2.7.0 # Apache-2.0 oslo.db>=3.2.0 # Apache-2.0 oslo.i18n>=1.5.0 # Apache-2.0