Merge "Lock nodes to avoid simultaneous introspection requests"

This commit is contained in:
Jenkins 2015-12-02 14:03:18 +00:00 committed by Gerrit Code Review
commit ef8fc580f2
8 changed files with 241 additions and 47 deletions

View File

@ -117,6 +117,26 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
def _background_introspect(ironic, node_info):
global _LAST_INTROSPECTION_TIME
if not node_info.options.get('new_ipmi_credentials'):
if re.match(CONF.introspection_delay_drivers, node_info.node().driver):
LOG.debug('Attempting to acquire lock on last introspection time')
with _LAST_INTROSPECTION_LOCK:
delay = (_LAST_INTROSPECTION_TIME - time.time()
+ CONF.introspection_delay)
if delay > 0:
LOG.debug('Waiting %d seconds before sending the next '
'node on introspection', delay)
time.sleep(delay)
_LAST_INTROSPECTION_TIME = time.time()
node_info.acquire_lock()
try:
_background_introspect_locked(ironic, node_info)
finally:
node_info.release_lock()
def _background_introspect_locked(ironic, node_info):
# TODO(dtantsur): pagination
macs = list(node_info.ports())
if macs:
@ -146,17 +166,6 @@ def _background_introspect(ironic, node_info):
' node %(node)s: %(exc)s') %
{'node': node_info.uuid, 'exc': exc})
if re.match(CONF.introspection_delay_drivers, node_info.node().driver):
LOG.debug('Attempting to acquire lock on last introspection time')
with _LAST_INTROSPECTION_LOCK:
delay = (_LAST_INTROSPECTION_TIME - time.time()
+ CONF.introspection_delay)
if delay > 0:
LOG.debug('Waiting %d seconds before sending the next '
'node on introspection', delay)
time.sleep(delay)
_LAST_INTROSPECTION_TIME = time.time()
try:
ironic.node.set_power_state(node_info.uuid, 'reboot')
except Exception as exc:

View File

@ -18,9 +18,11 @@ import json
import time
from ironicclient import exceptions
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
from oslo_utils import excutils
from sqlalchemy import text
from ironic_inspector import db
@ -34,13 +36,31 @@ LOG = log.getLogger("ironic_inspector.node_cache")
MACS_ATTRIBUTE = 'mac'
_LOCK_TEMPLATE = 'node-%s'
_SEMAPHORES = lockutils.Semaphores()
def _get_lock(uuid):
"""Get lock object for a given node UUID."""
return lockutils.internal_lock(_LOCK_TEMPLATE % uuid,
semaphores=_SEMAPHORES)
def _get_lock_ctx(uuid):
"""Get context manager yielding a lock object for a given node UUID."""
return lockutils.lock(_LOCK_TEMPLATE % uuid, semaphores=_SEMAPHORES)
class NodeInfo(object):
"""Record about a node in the cache."""
"""Record about a node in the cache.
def __init__(self, uuid, started_at, finished_at=None, error=None,
node=None, ports=None, ironic=None):
This class optionally allows to acquire a lock on a node. Note that the
class instance itself is NOT thread-safe, you need to create a new instance
for every thread.
"""
def __init__(self, uuid, started_at=None, finished_at=None, error=None,
node=None, ports=None, ironic=None, lock=None):
self.uuid = uuid
self.started_at = started_at
self.finished_at = finished_at
@ -52,6 +72,48 @@ class NodeInfo(object):
self._ports = ports
self._attributes = None
self._ironic = ironic
# This is a lock on a node UUID, not on a NodeInfo object
self._lock = lock if lock is not None else _get_lock(uuid)
# Whether lock was acquired using this NodeInfo object
self._locked = lock is not None
def __del__(self):
if self._locked:
LOG.warning(_LW('BUG: node lock was not released by the moment '
'node info object is deleted'))
self._lock.release()
def acquire_lock(self, blocking=True):
"""Acquire a lock on the associated node.
Exits with success if a lock is already acquired using this NodeInfo
object.
:param blocking: if True, wait for lock to be acquired, otherwise
return immediately.
:returns: boolean value, whether lock was acquired successfully
"""
if self._locked:
return True
LOG.debug('Attempting to acquire lock on node %s', self.uuid)
if self._lock.acquire(blocking):
self._locked = True
LOG.debug('Successfully acquired lock on node %s', self.uuid)
return True
else:
LOG.debug('Unable to acquire lock on node %s', self.uuid)
return False
def release_lock(self):
"""Release a lock on a node.
Does nothing if lock was not acquired using this NodeInfo object.
"""
if self._locked:
LOG.debug('Successfully released lock on node %s', self.uuid)
self._lock.release()
self._locked = False
@property
def options(self):
@ -98,6 +160,8 @@ class NodeInfo(object):
:param error: error message
"""
self.release_lock()
self.finished_at = time.time()
self.error = error
@ -136,11 +200,11 @@ class NodeInfo(object):
self._attributes = None
@classmethod
def from_row(cls, row, ironic=None):
def from_row(cls, row, ironic=None, lock=None):
"""Construct NodeInfo from a database row."""
fields = {key: row[key]
for key in ('uuid', 'started_at', 'finished_at', 'error')}
return cls(ironic=ironic, **fields)
return cls(ironic=ironic, lock=lock, **fields)
def invalidate_cache(self):
"""Clear all cached info, so that it's reloaded next time."""
@ -333,7 +397,8 @@ def delete_nodes_not_in_list(uuids):
LOG.warning(
_LW('Node %s was deleted from Ironic, dropping from Ironic '
'Inspector database'), uuid)
_delete_node(uuid)
with _get_lock_ctx(uuid):
_delete_node(uuid)
def _delete_node(uuid, session=None):
@ -362,23 +427,37 @@ def _list_node_uuids():
return {x.uuid for x in db.model_query(db.Node.uuid)}
def get_node(uuid, ironic=None):
def get_node(uuid, ironic=None, locked=False):
"""Get node from cache by it's UUID.
:param uuid: node UUID.
:param ironic: optional ironic client instance
:param locked: if True, get a lock on node before fetching its data
:returns: structure NodeInfo.
"""
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
if row is None:
raise utils.Error(_('Could not find node %s in cache') % uuid,
code=404)
return NodeInfo.from_row(row, ironic=ironic)
if locked:
lock = _get_lock(uuid)
lock.acquire()
else:
lock = None
try:
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
if row is None:
raise utils.Error(_('Could not find node %s in cache') % uuid,
code=404)
return NodeInfo.from_row(row, ironic=ironic, lock=lock)
except Exception:
with excutils.save_and_reraise_exception():
if lock is not None:
lock.release()
def find_node(**attributes):
"""Find node in cache.
This function acquires a lock on a node.
:param attributes: attributes known about this node (like macs, BMC etc)
also ironic client instance may be passed under 'ironic'
:returns: structure NodeInfo with attributes ``uuid`` and ``created_at``
@ -417,20 +496,28 @@ def find_node(**attributes):
% {'attr': attributes, 'found': list(found)}, code=404)
uuid = found.pop()
row = (db.model_query(db.Node.started_at, db.Node.finished_at).
filter_by(uuid=uuid).first())
node_info = NodeInfo(uuid=uuid, ironic=ironic)
node_info.acquire_lock()
if not row:
raise utils.Error(_(
'Could not find node %s in introspection cache, '
'probably it\'s not on introspection now') % uuid, code=404)
try:
row = (db.model_query(db.Node.started_at, db.Node.finished_at).
filter_by(uuid=uuid).first())
if row.finished_at:
raise utils.Error(_(
'Introspection for node %(node)s already finished on '
'%(finish)s') % {'node': uuid, 'finish': row.finished_at})
if not row:
raise utils.Error(_(
'Could not find node %s in introspection cache, '
'probably it\'s not on introspection now') % uuid, code=404)
return NodeInfo(uuid=uuid, started_at=row.started_at, ironic=ironic)
if row.finished_at:
raise utils.Error(_(
'Introspection for node %(node)s already finished on '
'%(finish)s') % {'node': uuid, 'finish': row.finished_at})
node_info.started_at = row.started_at
return node_info
except Exception:
with excutils.save_and_reraise_exception():
node_info.release_lock()
def clean_up():
@ -461,15 +548,20 @@ def clean_up():
return []
LOG.error(_LE('Introspection for nodes %s has timed out'), uuids)
query = db.model_query(db.Node, session=session).filter(
db.Node.started_at < threshold,
db.Node.finished_at.is_(None))
query.update({'finished_at': time.time(),
'error': 'Introspection timeout'})
for u in uuids:
db.model_query(db.Attribute, session=session).filter_by(
uuid=u).delete()
db.model_query(db.Option, session=session).filter_by(
uuid=u).delete()
node_info = get_node(u, locked=True)
try:
if node_info.finished_at or node_info.started_at > threshold:
continue
db.model_query(db.Node, session=session).filter_by(
uuid=u).update({'finished_at': time.time(),
'error': 'Introspection timeout'})
db.model_query(db.Attribute, session=session).filter_by(
uuid=u).delete()
db.model_query(db.Option, session=session).filter_by(
uuid=u).delete()
finally:
node_info.release_lock()
return uuids

View File

@ -85,6 +85,10 @@ def process(introspection_data):
'in hook %s') % hook_ext.name)
node_info = _find_node_info(introspection_data, failures)
if node_info:
# Locking is already done in find_node() but may be not done in a
# node_not_found hook
node_info.acquire_lock()
if failures and node_info:
msg = _('The following failures happened during running '

View File

@ -14,6 +14,7 @@
import unittest
import mock
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
@ -57,6 +58,7 @@ class BaseTest(unittest.TestCase):
engine.connect()
self.addCleanup(db.get_engine().dispose)
plugins_base._HOOKS_MGR = None
node_cache._SEMAPHORES = lockutils.Semaphores()
for name in ('_', '_LI', '_LW', '_LE', '_LC'):
patch = mock.patch.object(i18n, name, lambda s: s)
patch.start()

View File

@ -82,8 +82,10 @@ class TestIntrospect(BaseTest):
persistent=False)
cli.node.set_power_state.assert_called_once_with(self.uuid,
'reboot')
add_mock.return_value.set_option.assert_called_once_with(
self.node_info.set_option.assert_called_once_with(
'new_ipmi_credentials', None)
self.node_info.acquire_lock.assert_called_once_with()
self.node_info.release_lock.assert_called_once_with()
def test_ok_ilo_and_drac(self, client_mock, add_mock, filters_mock):
cli = self._prepare(client_mock)
@ -117,6 +119,8 @@ class TestIntrospect(BaseTest):
'reboot')
add_mock.return_value.finished.assert_called_once_with(
error=mock.ANY)
self.node_info.acquire_lock.assert_called_once_with()
self.node_info.release_lock.assert_called_once_with()
def test_unexpected_error(self, client_mock, add_mock, filters_mock):
cli = self._prepare(client_mock)
@ -133,6 +137,8 @@ class TestIntrospect(BaseTest):
self.assertFalse(cli.node.set_boot_device.called)
add_mock.return_value.finished.assert_called_once_with(
error=mock.ANY)
self.node_info.acquire_lock.assert_called_once_with()
self.node_info.release_lock.assert_called_once_with()
def test_with_maintenance(self, client_mock, add_mock, filters_mock):
cli = client_mock.return_value
@ -194,6 +200,8 @@ class TestIntrospect(BaseTest):
self.node_info.finished.assert_called_once_with(error=mock.ANY)
self.assertEqual(0, filters_mock.call_count)
self.assertEqual(0, cli.node.set_power_state.call_count)
self.node_info.acquire_lock.assert_called_once_with()
self.node_info.release_lock.assert_called_once_with()
def test_no_lookup_attrs_with_node_not_found_hook(self, client_mock,
add_mock, filters_mock):
@ -229,6 +237,7 @@ class TestIntrospect(BaseTest):
self.assertEqual(0, filters_mock.call_count)
self.assertEqual(0, cli.node.set_power_state.call_count)
self.assertFalse(add_mock.called)
self.assertFalse(self.node_info.acquire_lock.called)
def test_failed_to_validate_node(self, client_mock, add_mock,
filters_mock):
@ -247,6 +256,7 @@ class TestIntrospect(BaseTest):
self.assertEqual(0, filters_mock.call_count)
self.assertEqual(0, cli.node.set_power_state.call_count)
self.assertFalse(add_mock.called)
self.assertFalse(self.node_info.acquire_lock.called)
def test_wrong_provision_state(self, client_mock, add_mock, filters_mock):
self.node.provision_state = 'active'
@ -261,6 +271,7 @@ class TestIntrospect(BaseTest):
self.assertEqual(0, filters_mock.call_count)
self.assertEqual(0, cli.node.set_power_state.call_count)
self.assertFalse(add_mock.called)
self.assertFalse(self.node_info.acquire_lock.called)
@mock.patch.object(time, 'sleep')
@mock.patch.object(time, 'time')

View File

@ -41,6 +41,7 @@ class TestNodeCache(test_base.NodeTest):
bmc_address='1.2.3.4', foo=None)
self.assertEqual(self.uuid, res.uuid)
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 60)
self.assertFalse(res._locked)
res = (db.model_query(db.Node.uuid,
db.Node.started_at).order_by(db.Node.uuid).all())
@ -80,10 +81,12 @@ class TestNodeCache(test_base.NodeTest):
uuid=self.uuid).first()
self.assertIsNone(row_option)
@mock.patch.object(node_cache, '_get_lock_ctx', autospec=True)
@mock.patch.object(node_cache, '_list_node_uuids')
@mock.patch.object(node_cache, '_delete_node')
def test_delete_nodes_not_in_list(self, mock__delete_node,
mock__list_node_uuids):
mock__list_node_uuids,
mock__get_lock_ctx):
uuid2 = 'uuid2'
uuids = {self.uuid}
mock__list_node_uuids.return_value = {self.uuid, uuid2}
@ -91,6 +94,8 @@ class TestNodeCache(test_base.NodeTest):
with session.begin():
node_cache.delete_nodes_not_in_list(uuids)
mock__delete_node.assert_called_once_with(uuid2)
mock__get_lock_ctx.assert_called_once_with(uuid2)
mock__get_lock_ctx.return_value.__enter__.assert_called_once_with()
def test_add_node_duplicate_mac(self):
session = db.get_session()
@ -178,11 +183,13 @@ class TestNodeCacheFind(test_base.NodeTest):
res = node_cache.find_node(bmc_address='1.2.3.4')
self.assertEqual(self.uuid, res.uuid)
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
self.assertTrue(res._locked)
def test_macs(self):
res = node_cache.find_node(mac=['11:22:33:33:33:33', self.macs[1]])
self.assertEqual(self.uuid, res.uuid)
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
self.assertTrue(res._locked)
def test_macs_not_found(self):
self.assertRaises(utils.Error, node_cache.find_node,
@ -199,6 +206,7 @@ class TestNodeCacheFind(test_base.NodeTest):
mac=self.macs)
self.assertEqual(self.uuid, res.uuid)
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
self.assertTrue(res._locked)
def test_inconsistency(self):
session = db.get_session()
@ -244,8 +252,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
db.model_query(db.Attribute).count())
self.assertEqual(1, db.model_query(db.Option).count())
@mock.patch.object(node_cache, '_get_lock', autospec=True)
@mock.patch.object(time, 'time')
def test_ok(self, time_mock):
def test_ok(self, time_mock, get_lock_mock):
time_mock.return_value = 1000
self.assertFalse(node_cache.clean_up())
@ -256,9 +265,11 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
self.assertEqual(len(self.macs),
db.model_query(db.Attribute).count())
self.assertEqual(1, db.model_query(db.Option).count())
self.assertFalse(get_lock_mock.called)
@mock.patch.object(node_cache, '_get_lock', autospec=True)
@mock.patch.object(time, 'time')
def test_timeout(self, time_mock):
def test_timeout(self, time_mock, get_lock_mock):
# Add a finished node to confirm we don't try to timeout it
time_mock.return_value = self.started_at
session = db.get_session()
@ -277,6 +288,8 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
res)
self.assertEqual([], db.model_query(db.Attribute).all())
self.assertEqual([], db.model_query(db.Option).all())
get_lock_mock.assert_called_once_with(self.uuid)
get_lock_mock.return_value.acquire.assert_called_once_with()
def test_old_status(self):
CONF.set_override('node_status_keep_time', 42)
@ -302,6 +315,20 @@ class TestNodeCacheGetNode(test_base.NodeTest):
self.assertEqual(started_at, info.started_at)
self.assertIsNone(info.finished_at)
self.assertIsNone(info.error)
self.assertFalse(info._locked)
def test_locked(self):
started_at = time.time() - 42
session = db.get_session()
with session.begin():
db.Node(uuid=self.uuid, started_at=started_at).save(session)
info = node_cache.get_node(self.uuid, locked=True)
self.assertEqual(self.uuid, info.uuid)
self.assertEqual(started_at, info.started_at)
self.assertIsNone(info.finished_at)
self.assertIsNone(info.error)
self.assertTrue(info._locked)
def test_not_found(self):
self.assertRaises(utils.Error, node_cache.get_node, 'foo')
@ -343,6 +370,11 @@ class TestNodeInfoFinished(test_base.NodeTest):
self.assertEqual([], db.model_query(db.Attribute).all())
self.assertEqual([], db.model_query(db.Option).all())
def test_release_lock(self):
self.node_info.acquire_lock()
self.node_info.finished()
self.assertFalse(self.node_info._locked)
class TestInit(unittest.TestCase):
def setUp(self):
@ -574,3 +606,43 @@ class TestNodeCacheGetByPath(test_base.NodeTest):
self.assertEqual(42, self.node_info.get_by_path('/properties/answer'))
self.assertRaises(KeyError, self.node_info.get_by_path, '/foo')
self.assertRaises(KeyError, self.node_info.get_by_path, '/extra/foo')
@mock.patch.object(node_cache, '_get_lock', autospec=True)
class TestLock(test_base.NodeTest):
def test_acquire(self, get_lock_mock):
node_info = node_cache.NodeInfo(self.uuid)
self.assertFalse(node_info._locked)
get_lock_mock.assert_called_once_with(self.uuid)
self.assertFalse(get_lock_mock.return_value.acquire.called)
self.assertTrue(node_info.acquire_lock())
self.assertTrue(node_info._locked)
self.assertTrue(node_info.acquire_lock())
self.assertTrue(node_info._locked)
get_lock_mock.return_value.acquire.assert_called_once_with(True)
def test_release(self, get_lock_mock):
node_info = node_cache.NodeInfo(self.uuid)
node_info.acquire_lock()
self.assertTrue(node_info._locked)
node_info.release_lock()
self.assertFalse(node_info._locked)
node_info.release_lock()
self.assertFalse(node_info._locked)
get_lock_mock.return_value.acquire.assert_called_once_with(True)
get_lock_mock.return_value.release.assert_called_once_with()
def test_acquire_non_blocking(self, get_lock_mock):
node_info = node_cache.NodeInfo(self.uuid)
self.assertFalse(node_info._locked)
get_lock_mock.return_value.acquire.side_effect = iter([False, True])
self.assertFalse(node_info.acquire_lock(blocking=False))
self.assertFalse(node_info._locked)
self.assertTrue(node_info.acquire_lock(blocking=False))
self.assertTrue(node_info._locked)
self.assertTrue(node_info.acquire_lock(blocking=False))
self.assertTrue(node_info._locked)
get_lock_mock.return_value.acquire.assert_called_with(False)
self.assertEqual(2, get_lock_mock.return_value.acquire.call_count)

View File

@ -0,0 +1,3 @@
---
fixes:
- Acquire a lock on a node UUID when handling it.

View File

@ -13,6 +13,7 @@ pbr>=1.6
python-ironicclient>=0.8.0
python-keystoneclient!=1.8.0,>=1.6.0
python-swiftclient>=2.2.0
oslo.concurrency>=2.3.0 # Apache-2.0
oslo.config>=2.7.0 # Apache-2.0
oslo.db>=3.2.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0