Merge "Allocation API: taking over allocations of offline conductors"
This commit is contained in:
commit
cffd49ef35
@ -3526,6 +3526,37 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
|
||||
LOG.info('Successfully deleted allocation %s', allocation.uuid)
|
||||
|
||||
@METRICS.timer('ConductorManager._check_orphan_allocations')
|
||||
@periodics.periodic(
|
||||
spacing=CONF.conductor.check_allocations_interval,
|
||||
enabled=CONF.conductor.check_allocations_interval > 0)
|
||||
def _check_orphan_allocations(self, context):
|
||||
"""Periodically checks the status of allocations that were taken over.
|
||||
|
||||
Periodically checks the allocations assigned to a conductor that
|
||||
went offline, tries to take them over and finish.
|
||||
|
||||
:param context: request context.
|
||||
"""
|
||||
offline_conductors = self.dbapi.get_offline_conductors(field='id')
|
||||
for conductor_id in offline_conductors:
|
||||
filters = {'state': states.ALLOCATING,
|
||||
'conductor_affinity': conductor_id}
|
||||
for allocation in objects.Allocation.list(context,
|
||||
filters=filters):
|
||||
try:
|
||||
if not self.dbapi.take_over_allocation(allocation.id,
|
||||
conductor_id,
|
||||
self.conductor.id):
|
||||
# Another conductor has taken over, skipping
|
||||
continue
|
||||
|
||||
LOG.debug('Taking over allocation %s', allocation.uuid)
|
||||
allocations.do_allocate(context, allocation)
|
||||
except Exception:
|
||||
LOG.exception('Unexpected exception when taking over '
|
||||
'allocation %s', allocation.uuid)
|
||||
|
||||
|
||||
@METRICS.timer('get_vendor_passthru_metadata')
|
||||
def get_vendor_passthru_metadata(route_dict):
|
||||
|
@ -63,6 +63,11 @@ opts = [
|
||||
min=1,
|
||||
help=_('Interval (seconds) between checks of rescue '
|
||||
'timeouts.')),
|
||||
cfg.IntOpt('check_allocations_interval',
|
||||
default=60,
|
||||
min=0,
|
||||
help=_('Interval between checks of orphaned allocations, '
|
||||
'in seconds. Set to 0 to disable checks.')),
|
||||
cfg.IntOpt('deploy_callback_timeout',
|
||||
default=1800,
|
||||
help=_('Timeout (seconds) to wait for a callback from '
|
||||
|
@ -549,10 +549,11 @@ class Connection(object):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_offline_conductors(self):
|
||||
"""Get a list conductor hostnames that are offline (dead).
|
||||
def get_offline_conductors(self, field='hostname'):
|
||||
"""Get a list conductors that are offline (dead).
|
||||
|
||||
:returns: A list of conductor hostnames.
|
||||
:param field: A field to return, hostname by default.
|
||||
:returns: A list of requested fields of offline conductors.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -1158,6 +1159,23 @@ class Connection(object):
|
||||
:raises: NodeAssociated
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def take_over_allocation(self, allocation_id, old_conductor_id,
|
||||
new_conductor_id):
|
||||
"""Do a take over for an allocation.
|
||||
|
||||
The allocation is only updated if the old conductor matches the
|
||||
provided value, thus guarding against races.
|
||||
|
||||
:param allocation_id: Allocation ID
|
||||
:param old_conductor_id: The conductor ID we expect to be the current
|
||||
``conductor_affinity`` of the allocation.
|
||||
:param new_conductor_id: The conductor ID of the new
|
||||
``conductor_affinity``.
|
||||
:returns: True if the take over was successful, False otherwise.
|
||||
:raises: AllocationNotFound
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def destroy_allocation(self, allocation_id):
|
||||
"""Destroy an allocation.
|
||||
|
@ -993,10 +993,11 @@ class Connection(api.Connection):
|
||||
d2c[key].add(cdr_row['hostname'])
|
||||
return d2c
|
||||
|
||||
def get_offline_conductors(self):
|
||||
def get_offline_conductors(self, field='hostname'):
|
||||
field = getattr(models.Conductor, field)
|
||||
interval = CONF.conductor.heartbeat_timeout
|
||||
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
|
||||
result = (model_query(models.Conductor.hostname)
|
||||
result = (model_query(field)
|
||||
.filter(models.Conductor.updated_at < limit))
|
||||
return [row[0] for row in result]
|
||||
|
||||
@ -1750,6 +1751,39 @@ class Connection(api.Connection):
|
||||
raise
|
||||
return ref
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def take_over_allocation(self, allocation_id, old_conductor_id,
|
||||
new_conductor_id):
|
||||
"""Do a take over for an allocation.
|
||||
|
||||
The allocation is only updated if the old conductor matches the
|
||||
provided value, thus guarding against races.
|
||||
|
||||
:param allocation_id: Allocation ID
|
||||
:param old_conductor_id: The conductor ID we expect to be the current
|
||||
``conductor_affinity`` of the allocation.
|
||||
:param new_conductor_id: The conductor ID of the new
|
||||
``conductor_affinity``.
|
||||
:returns: True if the take over was successful, False otherwise.
|
||||
:raises: AllocationNotFound
|
||||
"""
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
query = model_query(models.Allocation, session=session)
|
||||
query = add_identity_filter(query, allocation_id)
|
||||
# NOTE(dtantsur): the FOR UPDATE clause locks the allocation
|
||||
ref = query.with_for_update().one()
|
||||
if ref.conductor_affinity != old_conductor_id:
|
||||
# Race detected, bailing out
|
||||
return False
|
||||
|
||||
ref.update({'conductor_affinity': new_conductor_id})
|
||||
session.flush()
|
||||
except NoResultFound:
|
||||
raise exception.AllocationNotFound(allocation=allocation_id)
|
||||
else:
|
||||
return True
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def destroy_allocation(self, allocation_id):
|
||||
"""Destroy an allocation.
|
||||
|
@ -168,6 +168,37 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
self.assertEqual(allocation.uuid, actual.uuid)
|
||||
self.assertIsInstance(allocation, objects.Allocation)
|
||||
|
||||
@mock.patch.object(allocations, 'do_allocate', autospec=True)
|
||||
def test_check_orphaned_allocations(self, mock_allocate):
|
||||
alive_conductor = obj_utils.create_test_conductor(
|
||||
self.context, id=42, hostname='alive')
|
||||
dead_conductor = obj_utils.create_test_conductor(
|
||||
self.context, id=43, hostname='dead')
|
||||
|
||||
obj_utils.create_test_allocation(
|
||||
self.context,
|
||||
state='allocating',
|
||||
conductor_affinity=alive_conductor.id)
|
||||
allocation = obj_utils.create_test_allocation(
|
||||
self.context,
|
||||
state='allocating',
|
||||
conductor_affinity=dead_conductor.id)
|
||||
|
||||
self._start_service()
|
||||
with mock.patch.object(self.dbapi, 'get_offline_conductors',
|
||||
autospec=True) as mock_conds:
|
||||
mock_conds.return_value = [dead_conductor.id]
|
||||
self.service._check_orphan_allocations(self.context)
|
||||
|
||||
mock_allocate.assert_called_once_with(self.context, mock.ANY)
|
||||
actual = mock_allocate.call_args[0][1]
|
||||
self.assertEqual(allocation.uuid, actual.uuid)
|
||||
self.assertIsInstance(allocation, objects.Allocation)
|
||||
|
||||
allocation = self.dbapi.get_allocation_by_id(allocation.id)
|
||||
self.assertEqual(self.service.conductor.id,
|
||||
allocation.conductor_affinity)
|
||||
|
||||
|
||||
@mock.patch('time.sleep', lambda _: None)
|
||||
class DoAllocateTestCase(db_base.DbTestCase):
|
||||
|
@ -244,6 +244,31 @@ class AllocationsTestCase(base.DbTestCase):
|
||||
self.assertIsNone(node.instance_uuid)
|
||||
self.assertNotIn('traits', node.instance_info)
|
||||
|
||||
def test_take_over_success(self):
|
||||
for i in range(2):
|
||||
db_utils.create_test_conductor(id=i, hostname='host-%d' % i)
|
||||
allocation = db_utils.create_test_allocation(conductor_affinity=0)
|
||||
|
||||
self.assertTrue(self.dbapi.take_over_allocation(
|
||||
allocation.id, old_conductor_id=0, new_conductor_id=1))
|
||||
allocation = self.dbapi.get_allocation_by_id(allocation.id)
|
||||
self.assertEqual(1, allocation.conductor_affinity)
|
||||
|
||||
def test_take_over_conflict(self):
|
||||
for i in range(3):
|
||||
db_utils.create_test_conductor(id=i, hostname='host-%d' % i)
|
||||
allocation = db_utils.create_test_allocation(conductor_affinity=2)
|
||||
|
||||
self.assertFalse(self.dbapi.take_over_allocation(
|
||||
allocation.id, old_conductor_id=0, new_conductor_id=1))
|
||||
allocation = self.dbapi.get_allocation_by_id(allocation.id)
|
||||
# The affinity was not changed
|
||||
self.assertEqual(2, allocation.conductor_affinity)
|
||||
|
||||
def test_take_over_allocation_not_found(self):
|
||||
self.assertRaises(exception.AllocationNotFound,
|
||||
self.dbapi.take_over_allocation, 999, 0, 1)
|
||||
|
||||
def test_create_allocation_duplicated_name(self):
|
||||
self.assertRaises(exception.AllocationDuplicateName,
|
||||
db_utils.create_test_allocation,
|
||||
|
@ -334,6 +334,7 @@ class DbConductorTestCase(base.DbTestCase):
|
||||
# 61 seconds passed since last heartbeat, it's dead
|
||||
mock_utcnow.return_value = time_ + datetime.timedelta(seconds=61)
|
||||
self.assertEqual([c.hostname], self.dbapi.get_offline_conductors())
|
||||
self.assertEqual([c.id], self.dbapi.get_offline_conductors(field='id'))
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
def test_get_online_conductors(self, mock_utcnow):
|
||||
|
Loading…
Reference in New Issue
Block a user