Merge "Periodically checks the status of nodes in DEPLOYING state"
This commit is contained in:
commit
db554f8066
|
@ -1152,6 +1152,60 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
|||
self._fail_if_in_state(context, filters, states.DEPLOYWAIT,
|
||||
sort_key, callback_method, err_handler)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.conductor.check_provision_state_interval)
|
||||
def _check_deploying_status(self, context):
|
||||
"""Periodically checks the status of nodes in DEPLOYING state.
|
||||
|
||||
Periodically checks the nodes in DEPLOYING and the state of the
|
||||
conductor deploying them. If we find out that a conductor that
|
||||
was provisioning the node has died we then break release the
|
||||
node and gracefully mark the deployment as failed.
|
||||
|
||||
:param context: request context.
|
||||
"""
|
||||
offline_conductors = self.dbapi.get_offline_conductors()
|
||||
if not offline_conductors:
|
||||
return
|
||||
|
||||
node_iter = self.iter_nodes(
|
||||
fields=['id', 'reservation'],
|
||||
filters={'provision_state': states.DEPLOYING,
|
||||
'maintenance': False})
|
||||
if not node_iter:
|
||||
return
|
||||
|
||||
for node_uuid, driver, node_id, conductor_hostname in node_iter:
|
||||
if conductor_hostname not in offline_conductors:
|
||||
continue
|
||||
|
||||
# NOTE(lucasagomes): Although very rare, this may lead to a
|
||||
# race condition. By the time we release the lock the conductor
|
||||
# that was previously managing the node could be back online.
|
||||
try:
|
||||
objects.Node.release(context, conductor_hostname, node_id)
|
||||
except exception.NodeNotFound:
|
||||
LOG.warning(_LW("During checking for deploying state, node "
|
||||
"%s was not found and presumed deleted by "
|
||||
"another process. Skipping."), node_uuid)
|
||||
continue
|
||||
except exception.NodeLocked:
|
||||
LOG.warning(_LW("During checking for deploying state, when "
|
||||
"releasing the lock of the node %s, it was "
|
||||
"locked by another process. Skipping."),
|
||||
node_uuid)
|
||||
continue
|
||||
except exception.NodeNotLocked:
|
||||
LOG.warning(_LW("During checking for deploying state, when "
|
||||
"releasing the lock of the node %s, it was "
|
||||
"already unlocked."), node_uuid)
|
||||
|
||||
self._fail_if_in_state(
|
||||
context, {'id': node_id}, states.DEPLOYING,
|
||||
'provision_updated_at',
|
||||
callback_method=utils.cleanup_after_timeout,
|
||||
err_handler=provisioning_error_handler)
|
||||
|
||||
def _do_takeover(self, task):
|
||||
"""Take over this node.
|
||||
|
||||
|
|
|
@ -393,3 +393,10 @@ class Connection(object):
|
|||
{driverA: set([host1, host2]),
|
||||
driverB: set([host2, host3])}
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_offline_conductors(self):
|
||||
"""Get a list conductor hostnames that are offline (dead).
|
||||
|
||||
:returns: A list of conductor hostnames.
|
||||
"""
|
||||
|
|
|
@ -604,3 +604,11 @@ class Connection(api.Connection):
|
|||
for driver in row['drivers']:
|
||||
d2c[driver].add(row['hostname'])
|
||||
return d2c
|
||||
|
||||
def get_offline_conductors(self):
|
||||
interval = CONF.conductor.heartbeat_timeout
|
||||
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
|
||||
result = (model_query(models.Conductor).filter_by()
|
||||
.filter(models.Conductor.updated_at < limit)
|
||||
.all())
|
||||
return [row['hostname'] for row in result]
|
||||
|
|
|
@ -4123,3 +4123,105 @@ class DestroyPortTestCase(_ServiceSetUpMixin, tests_db_base.DbTestCase):
|
|||
self.context, port)
|
||||
# Compare true exception hidden by @messaging.expected_exceptions
|
||||
self.assertEqual(exception.NodeLocked, exc.exc_info[0])
|
||||
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, '_fail_if_in_state')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_offline_conductors')
|
||||
class ManagerCheckDeployingStatusTestCase(_ServiceSetUpMixin,
|
||||
tests_db_base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(ManagerCheckDeployingStatusTestCase, self).setUp()
|
||||
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||
self.service.dbapi = self.dbapi
|
||||
|
||||
self._start_service()
|
||||
|
||||
self.node = obj_utils.create_test_node(
|
||||
self.context, id=1, uuid=uuidutils.generate_uuid(),
|
||||
driver='fake', provision_state=states.DEPLOYING,
|
||||
target_provision_state=states.DEPLOYDONE,
|
||||
reservation='fake-conductor')
|
||||
|
||||
# create a second node in a different state to test the
|
||||
# filtering nodes in DEPLOYING state
|
||||
obj_utils.create_test_node(
|
||||
self.context, id=10, uuid=uuidutils.generate_uuid(),
|
||||
driver='fake', provision_state=states.AVAILABLE,
|
||||
target_provision_state=states.NOSTATE)
|
||||
|
||||
self.expected_filter = {
|
||||
'provision_state': 'deploying', 'reserved': False,
|
||||
'maintenance': False}
|
||||
|
||||
def test__check_deploying_status(self, mock_off_cond, mock_mapped,
|
||||
mock_fail_if):
|
||||
mock_off_cond.return_value = ['fake-conductor']
|
||||
|
||||
self.service._check_deploying_status(self.context)
|
||||
|
||||
self.node.refresh()
|
||||
mock_off_cond.assert_called_once_with()
|
||||
mock_mapped.assert_called_once_with(self.node.uuid, 'fake')
|
||||
mock_fail_if.assert_called_once_with(
|
||||
mock.ANY, {'id': self.node.id}, states.DEPLOYING,
|
||||
'provision_updated_at',
|
||||
callback_method=conductor_utils.cleanup_after_timeout,
|
||||
err_handler=manager.provisioning_error_handler)
|
||||
# assert node was released
|
||||
self.assertIsNone(self.node.reservation)
|
||||
|
||||
def test__check_deploying_status_alive(self, mock_off_cond,
|
||||
mock_mapped, mock_fail_if):
|
||||
mock_off_cond.return_value = []
|
||||
|
||||
self.service._check_deploying_status(self.context)
|
||||
|
||||
self.node.refresh()
|
||||
mock_off_cond.assert_called_once_with()
|
||||
self.assertFalse(mock_mapped.called)
|
||||
self.assertFalse(mock_fail_if.called)
|
||||
# assert node still locked
|
||||
self.assertIsNotNone(self.node.reservation)
|
||||
|
||||
@mock.patch.object(objects.Node, 'release')
|
||||
def test__check_deploying_status_release_exceptions_skipping(
|
||||
self, mock_release, mock_off_cond, mock_mapped, mock_fail_if):
|
||||
mock_off_cond.return_value = ['fake-conductor']
|
||||
# Add another node so we can check both exceptions
|
||||
node2 = obj_utils.create_test_node(
|
||||
self.context, id=2, uuid=uuidutils.generate_uuid(),
|
||||
driver='fake', provision_state=states.DEPLOYING,
|
||||
target_provision_state=states.DEPLOYDONE,
|
||||
reservation='fake-conductor')
|
||||
|
||||
mock_mapped.return_value = True
|
||||
mock_release.side_effect = iter([exception.NodeNotFound('not found'),
|
||||
exception.NodeLocked('locked')])
|
||||
self.service._check_deploying_status(self.context)
|
||||
|
||||
self.node.refresh()
|
||||
mock_off_cond.assert_called_once_with()
|
||||
expected_calls = [mock.call(self.node.uuid, 'fake'),
|
||||
mock.call(node2.uuid, 'fake')]
|
||||
mock_mapped.assert_has_calls(expected_calls)
|
||||
# Assert we skipped and didn't try to call _fail_if_in_state
|
||||
self.assertFalse(mock_fail_if.called)
|
||||
|
||||
@mock.patch.object(objects.Node, 'release')
|
||||
def test__check_deploying_status_release_node_not_locked(
|
||||
self, mock_release, mock_off_cond, mock_mapped, mock_fail_if):
|
||||
mock_off_cond.return_value = ['fake-conductor']
|
||||
mock_mapped.return_value = True
|
||||
mock_release.side_effect = iter([
|
||||
exception.NodeNotLocked('not locked')])
|
||||
self.service._check_deploying_status(self.context)
|
||||
|
||||
self.node.refresh()
|
||||
mock_off_cond.assert_called_once_with()
|
||||
mock_mapped.assert_called_once_with(self.node.uuid, 'fake')
|
||||
mock_fail_if.assert_called_once_with(
|
||||
mock.ANY, {'id': self.node.id}, states.DEPLOYING,
|
||||
'provision_updated_at',
|
||||
callback_method=conductor_utils.cleanup_after_timeout,
|
||||
err_handler=manager.provisioning_error_handler)
|
||||
|
|
|
@ -200,3 +200,20 @@ class DbConductorTestCase(base.DbTestCase):
|
|||
expected = {d: set([h1, h2]), d1: set([h1]), d2: set([h2])}
|
||||
result = self.dbapi.get_active_driver_dict(interval=two_minute)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
def test_get_offline_conductors(self, mock_utcnow):
|
||||
self.config(heartbeat_timeout=60, group='conductor')
|
||||
time_ = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
|
||||
mock_utcnow.return_value = time_
|
||||
c = self._create_test_cdr()
|
||||
|
||||
# Only 30 seconds passed since last heartbeat, it's still
|
||||
# considered alive
|
||||
mock_utcnow.return_value = time_ + datetime.timedelta(seconds=30)
|
||||
self.assertEqual([], self.dbapi.get_offline_conductors())
|
||||
|
||||
# 61 seconds passed since last heartbeat, it's dead
|
||||
mock_utcnow.return_value = time_ + datetime.timedelta(seconds=61)
|
||||
self.assertEqual([c.hostname], self.dbapi.get_offline_conductors())
|
||||
|
|
Loading…
Reference in New Issue