Merge "Retry /reshape at provider generation conflict"
This commit is contained in:
commit
0bab2e5a88
nova
@ -1195,9 +1195,16 @@ class ResourceTracker(object):
|
|||||||
|
|
||||||
return list(traits)
|
return list(traits)
|
||||||
|
|
||||||
@retrying.retry(stop_max_attempt_number=4,
|
@retrying.retry(
|
||||||
retry_on_exception=lambda e: isinstance(
|
stop_max_attempt_number=4,
|
||||||
e, exception.ResourceProviderUpdateConflict))
|
retry_on_exception=lambda e: isinstance(
|
||||||
|
e,
|
||||||
|
(
|
||||||
|
exception.ResourceProviderUpdateConflict,
|
||||||
|
exception.PlacementReshapeConflict,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
def _update_to_placement(self, context, compute_node, startup):
|
def _update_to_placement(self, context, compute_node, startup):
|
||||||
"""Send resource and inventory changes to placement."""
|
"""Send resource and inventory changes to placement."""
|
||||||
# NOTE(jianghuaw): Some resources(e.g. VGPU) are not saved in the
|
# NOTE(jianghuaw): Some resources(e.g. VGPU) are not saved in the
|
||||||
|
@ -2088,6 +2088,16 @@ class ResourceProviderUpdateConflict(PlacementAPIConflict):
|
|||||||
"provider %(uuid)s (generation %(generation)d): %(error)s")
|
"provider %(uuid)s (generation %(generation)d): %(error)s")
|
||||||
|
|
||||||
|
|
||||||
|
class PlacementReshapeConflict(PlacementAPIConflict):
|
||||||
|
"""A 409 caused by generation mismatch from attempting to reshape a
|
||||||
|
provider tree.
|
||||||
|
"""
|
||||||
|
msg_fmt = _(
|
||||||
|
"A conflict was encountered attempting to reshape a provider tree: "
|
||||||
|
"$(error)s"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class InvalidResourceClass(Invalid):
|
class InvalidResourceClass(Invalid):
|
||||||
msg_fmt = _("Resource class '%(resource_class)s' invalid.")
|
msg_fmt = _("Resource class '%(resource_class)s' invalid.")
|
||||||
|
|
||||||
|
@ -1277,6 +1277,11 @@ class SchedulerReportClient(object):
|
|||||||
resp = self.post('/reshaper', payload, version=RESHAPER_VERSION,
|
resp = self.post('/reshaper', payload, version=RESHAPER_VERSION,
|
||||||
global_request_id=context.global_id)
|
global_request_id=context.global_id)
|
||||||
if not resp:
|
if not resp:
|
||||||
|
if resp.status_code == 409:
|
||||||
|
err = resp.json()['errors'][0]
|
||||||
|
if err['code'] == 'placement.concurrent_update':
|
||||||
|
raise exception.PlacementReshapeConflict(error=resp.text)
|
||||||
|
|
||||||
raise exception.ReshapeFailed(error=resp.text)
|
raise exception.ReshapeFailed(error=resp.text)
|
||||||
|
|
||||||
return resp
|
return resp
|
||||||
@ -1310,7 +1315,7 @@ class SchedulerReportClient(object):
|
|||||||
# failure here to be fatal to the caller.
|
# failure here to be fatal to the caller.
|
||||||
try:
|
try:
|
||||||
self._reshape(context, inventories, allocations)
|
self._reshape(context, inventories, allocations)
|
||||||
except exception.ReshapeFailed:
|
except (exception.ReshapeFailed, exception.PlacementReshapeConflict):
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Make sure the original stack trace gets logged.
|
# Make sure the original stack trace gets logged.
|
||||||
@ -1429,8 +1434,16 @@ class SchedulerReportClient(object):
|
|||||||
if allocations is not None:
|
if allocations is not None:
|
||||||
# NOTE(efried): We do not catch_all here, because ReshapeFailed
|
# NOTE(efried): We do not catch_all here, because ReshapeFailed
|
||||||
# needs to bubble up right away and be handled specially.
|
# needs to bubble up right away and be handled specially.
|
||||||
self._set_up_and_do_reshape(context, old_tree, new_tree,
|
try:
|
||||||
allocations)
|
self._set_up_and_do_reshape(
|
||||||
|
context, old_tree, new_tree, allocations)
|
||||||
|
except exception.PlacementReshapeConflict:
|
||||||
|
# The conflict means we need to invalidate the local caches and
|
||||||
|
# let the retry mechanism in _update_to_placement to re-drive
|
||||||
|
# the reshape top of the fresh data
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
self.clear_provider_cache()
|
||||||
|
|
||||||
# The reshape updated provider generations, so the ones we have in
|
# The reshape updated provider generations, so the ones we have in
|
||||||
# the cache are now stale. The inventory update below will short
|
# the cache are now stale. The inventory update below will short
|
||||||
# out, but we would still bounce with a provider generation
|
# out, but we would still bounce with a provider generation
|
||||||
|
@ -1363,6 +1363,17 @@ class SchedulerReportClientTests(test.TestCase):
|
|||||||
resp = self.client._reshape(self.context, inventories, allocs)
|
resp = self.client._reshape(self.context, inventories, allocs)
|
||||||
self.assertEqual(204, resp.status_code)
|
self.assertEqual(204, resp.status_code)
|
||||||
|
|
||||||
|
# Trigger generation conflict
|
||||||
|
# We can do this is by simply sending back the same reshape as that
|
||||||
|
# will not work because the previous reshape updated generations
|
||||||
|
self.assertRaises(
|
||||||
|
exception.PlacementReshapeConflict,
|
||||||
|
self.client._reshape,
|
||||||
|
self.context,
|
||||||
|
inventories,
|
||||||
|
allocs,
|
||||||
|
)
|
||||||
|
|
||||||
def test_update_from_provider_tree_reshape(self):
|
def test_update_from_provider_tree_reshape(self):
|
||||||
"""Run update_from_provider_tree with reshaping."""
|
"""Run update_from_provider_tree with reshaping."""
|
||||||
exp_ptree = self._set_up_provider_tree()
|
exp_ptree = self._set_up_provider_tree()
|
||||||
@ -1519,3 +1530,44 @@ class SchedulerReportClientTests(test.TestCase):
|
|||||||
self.context, self.compute_name)
|
self.context, self.compute_name)
|
||||||
self.assertProviderTree(orig_exp_ptree, ptree)
|
self.assertProviderTree(orig_exp_ptree, ptree)
|
||||||
self.assertAllocations(orig_exp_allocs, allocs)
|
self.assertAllocations(orig_exp_allocs, allocs)
|
||||||
|
|
||||||
|
def test_update_from_provider_tree_reshape_conflict_retry(self):
|
||||||
|
exp_ptree = self._set_up_provider_tree()
|
||||||
|
|
||||||
|
ptree = self.client.get_provider_tree_and_ensure_root(
|
||||||
|
self.context, self.compute_uuid)
|
||||||
|
allocs = self.client.get_allocations_for_provider_tree(
|
||||||
|
self.context, self.compute_name)
|
||||||
|
self.assertProviderTree(exp_ptree, ptree)
|
||||||
|
self.assertAllocations({}, allocs)
|
||||||
|
|
||||||
|
exp_allocs = self._set_up_provider_tree_allocs()
|
||||||
|
|
||||||
|
# we prepare inventory and allocation changes to trigger a reshape
|
||||||
|
for rp_uuid in ptree.get_provider_uuids():
|
||||||
|
# Add a new resource class to the inventories
|
||||||
|
ptree.update_inventory(
|
||||||
|
rp_uuid, dict(ptree.data(rp_uuid).inventory,
|
||||||
|
CUSTOM_FOO={'total': 10}))
|
||||||
|
exp_ptree[rp_uuid]['inventory']['CUSTOM_FOO'] = {'total': 10}
|
||||||
|
for c_uuid, alloc in allocs.items():
|
||||||
|
for rp_uuid, res in alloc['allocations'].items():
|
||||||
|
res['resources']['CUSTOM_FOO'] = 1
|
||||||
|
exp_allocs[c_uuid]['allocations'][rp_uuid][
|
||||||
|
'resources']['CUSTOM_FOO'] = 1
|
||||||
|
|
||||||
|
# As the inventory update happens is the same request as the allocation
|
||||||
|
# update the allocation update will have a generation conflict.
|
||||||
|
# So we expect that it is signalled with an exception so that the
|
||||||
|
# upper layer can re-drive the reshape process with a fresh tree that
|
||||||
|
# now has the inventories
|
||||||
|
self.assertRaises(
|
||||||
|
exception.PlacementReshapeConflict,
|
||||||
|
self.client.update_from_provider_tree,
|
||||||
|
self.context,
|
||||||
|
ptree,
|
||||||
|
allocations=allocs,
|
||||||
|
)
|
||||||
|
# also we except that the internal caches is cleared so that the
|
||||||
|
# re-drive will have a chance to load fresh data from placement
|
||||||
|
self.assertEqual(0, len(self.client._provider_tree.roots))
|
||||||
|
@ -1710,12 +1710,18 @@ class TestUpdateComputeNode(BaseTestCase):
|
|||||||
self.assertEqual(exp_inv, ptree.data(new_compute.uuid).inventory)
|
self.assertEqual(exp_inv, ptree.data(new_compute.uuid).inventory)
|
||||||
mock_sync_disabled.assert_called_once()
|
mock_sync_disabled.assert_called_once()
|
||||||
|
|
||||||
|
@ddt.data(
|
||||||
|
exc.ResourceProviderUpdateConflict(
|
||||||
|
uuid='uuid', generation=42, error='error'),
|
||||||
|
exc.PlacementReshapeConflict(error='error'),
|
||||||
|
)
|
||||||
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
|
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
|
||||||
'_sync_compute_service_disabled_trait')
|
'_sync_compute_service_disabled_trait')
|
||||||
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
|
@mock.patch('nova.compute.resource_tracker.ResourceTracker.'
|
||||||
'_resource_change', return_value=False)
|
'_resource_change', return_value=False)
|
||||||
def test_update_retry_success(self, mock_resource_change,
|
def test_update_retry_success(
|
||||||
mock_sync_disabled):
|
self, exc, mock_resource_change, mock_sync_disabled
|
||||||
|
):
|
||||||
self._setup_rt()
|
self._setup_rt()
|
||||||
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
|
orig_compute = _COMPUTE_NODE_FIXTURES[0].obj_clone()
|
||||||
self.rt.compute_nodes[_NODENAME] = orig_compute
|
self.rt.compute_nodes[_NODENAME] = orig_compute
|
||||||
@ -1729,9 +1735,7 @@ class TestUpdateComputeNode(BaseTestCase):
|
|||||||
self.driver_mock.update_provider_tree.side_effect = lambda *a: None
|
self.driver_mock.update_provider_tree.side_effect = lambda *a: None
|
||||||
|
|
||||||
ufpt_mock = self.rt.reportclient.update_from_provider_tree
|
ufpt_mock = self.rt.reportclient.update_from_provider_tree
|
||||||
ufpt_mock.side_effect = (
|
ufpt_mock.side_effect = (exc, None)
|
||||||
exc.ResourceProviderUpdateConflict(
|
|
||||||
uuid='uuid', generation=42, error='error'), None)
|
|
||||||
|
|
||||||
self.rt._update(mock.sentinel.ctx, new_compute)
|
self.rt._update(mock.sentinel.ctx, new_compute)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user