diff --git a/nova/api/openstack/placement/objects/resource_provider.py b/nova/api/openstack/placement/objects/resource_provider.py index d5eb8436e..1c7e5a01c 100644 --- a/nova/api/openstack/placement/objects/resource_provider.py +++ b/nova/api/openstack/placement/objects/resource_provider.py @@ -4034,3 +4034,89 @@ class AllocationCandidates(base.VersionedObject): kept_summary_objs = summary_objs return alloc_request_objs, kept_summary_objs + + +@db_api.placement_context_manager.writer +def reshape(ctx, inventories, allocations): + """The 'replace the world' strategy that is executed when we want to + completely replace a set of provider inventory, allocation and consumer + information in a single transaction. + + :note: The reason this has to be done in a single monolithic function is so + we have a single top-level function on which to decorate with the + @db_api.placement_context_manager.writer transaction context + manager. Each time a top-level function that is decorated with this + exits, the transaction is either COMMIT'd or ROLLBACK'd. We need to + avoid calling two functions that are already decorated with a + transaction context manager from a function that *isn't* decorated + with the transaction context manager if we want all changes involved + in the sub-functions to operate within a single DB transaction. + + :param ctx: `nova.api.openstack.placement.context.RequestContext` object + containing the DB transaction context. + :param inventories: dict, keyed by resource provider UUID, of + `InventoryList` objects representing the replaced + inventory information for the provider. + :param allocations: `AllocationList` object containing all allocations for + all consumers being modified by the reshape operation. + :raises: `exception.ConcurrentUpdateDetected` when any resource provider or + consumer generation increment fails due to concurrent changes to + the same objects. + """ + # The resource provider objects, keyed by provider UUID, that are involved + # in this transaction. We keep a cache of these because as we perform the + # various operations on the providers, their generations increment and we + # want to "inject" the changed resource provider objects into the + # AllocationList's objects before calling AllocationList.replace_all() + affected_providers = {} + # We have to do the inventory changes in two steps because: + # - we can't delete inventories with allocations; and + # - we can't create allocations on nonexistent inventories. + # So in the first step we create a kind of "union" inventory for each + # provider. It contains all the inventories that the request wishes to + # exist in the end, PLUS any inventories that the request wished to remove + # (in their original form). + # Note that this can cause us to end up with an interim situation where we + # have modified an inventory to have less capacity than is currently + # allocated, but that's allowed by the code. If the final picture is + # overcommitted, we'll get an appropriate exception when we replace the + # allocations at the end. + for rp_uuid, new_inv_list in inventories.items(): + LOG.debug("reshaping: *interim* inventory replacement for provider %s", + rp_uuid) + rp = new_inv_list[0].resource_provider + # A dict, keyed by resource class, of the Inventory objects. We start + # with the original inventory list. + inv_by_rc = {inv.resource_class: inv for inv in + InventoryList.get_all_by_resource_provider(ctx, rp)} + # Now add each inventory in the new inventory list. If an inventory for + # that resource class existed in the original inventory list, it is + # overwritten. + for inv in new_inv_list: + inv_by_rc[inv.resource_class] = inv + # Set the interim inventory structure. + rp.set_inventory(InventoryList(objects=list(inv_by_rc.values()))) + affected_providers[rp_uuid] = rp + + # NOTE(jaypipes): The above inventory replacements will have + # incremented the resource provider generations, so we need to look in + # the AllocationList and swap the resource provider object with the one we + # saved above that has the updated provider generation in it. + for alloc in allocations: + rp_uuid = alloc.resource_provider.uuid + if rp_uuid in affected_providers: + alloc.resource_provider = affected_providers[rp_uuid] + + # Now we can replace all the allocations + LOG.debug("reshaping: attempting allocation replacement") + allocations.replace_all() + + # And finally, we can set the inventories to their actual desired state. + for rp_uuid, new_inv_list in inventories.items(): + LOG.debug("reshaping: *final* inventory replacement for provider %s", + rp_uuid) + # TODO(efried): If we wanted this to be more efficient, we could keep + # track of providers for which all inventories are being deleted in the + # above loop and just do those and skip the rest, since they're already + # in their final form. + new_inv_list[0].resource_provider.set_inventory(new_inv_list) diff --git a/nova/tests/functional/api/openstack/placement/db/test_reshape.py b/nova/tests/functional/api/openstack/placement/db/test_reshape.py new file mode 100644 index 000000000..74e87f40a --- /dev/null +++ b/nova/tests/functional/api/openstack/placement/db/test_reshape.py @@ -0,0 +1,359 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.api.openstack.placement import exception +from nova.api.openstack.placement.objects import consumer as consumer_obj +from nova.api.openstack.placement.objects import resource_provider as rp_obj +from nova.tests.functional.api.openstack.placement.db import test_base as tb +from nova.tests import uuidsentinel as uuids + + +def alloc_for_rc(alloc_list, rc): + for alloc in alloc_list: + if alloc.resource_class == rc: + return alloc + + +class ReshapeTestCase(tb.PlacementDbBaseTestCase): + """Test 'replace the world' reshape transaction.""" + + def test_reshape(self): + """We set up the following scenario: + + BEFORE: single compute node setup + + A single compute node with: + - VCPU, MEMORY_MB, DISK_GB inventory + - Two instances consuming CPU, RAM and DISK from that compute node + + AFTER: hierarchical + shared storage setup + + A compute node parent provider with: + - MEMORY_MB + Two NUMA node child providers containing: + - VCPU + Shared storage provider with: + - DISK_GB + Both instances have their resources split among the providers and + shared storage accordingly + """ + # First create our consumers + i1_uuid = uuids.instance1 + i1_consumer = consumer_obj.Consumer( + self.ctx, uuid=i1_uuid, user=self.user_obj, + project=self.project_obj) + i1_consumer.create() + + i2_uuid = uuids.instance2 + i2_consumer = consumer_obj.Consumer( + self.ctx, uuid=i2_uuid, user=self.user_obj, + project=self.project_obj) + i2_consumer.create() + + cn1 = self._create_provider('cn1') + tb.add_inventory(cn1, 'VCPU', 16) + tb.add_inventory(cn1, 'MEMORY_MB', 32768) + tb.add_inventory(cn1, 'DISK_GB', 1000) + + # Allocate both instances against the single compute node + for consumer in (i1_consumer, i2_consumer): + allocs = [ + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='VCPU', consumer=consumer, used=2), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='MEMORY_MB', consumer=consumer, used=1024), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='DISK_GB', consumer=consumer, used=100), + ] + alloc_list = rp_obj.AllocationList(self.ctx, objects=allocs) + alloc_list.replace_all() + + # Verify we have the allocations we expect for the BEFORE scenario + before_allocs_i1 = rp_obj.AllocationList.get_all_by_consumer_id( + self.ctx, i1_uuid) + self.assertEqual(3, len(before_allocs_i1)) + self.assertEqual(cn1.uuid, before_allocs_i1[0].resource_provider.uuid) + before_allocs_i2 = rp_obj.AllocationList.get_all_by_consumer_id( + self.ctx, i2_uuid) + self.assertEqual(3, len(before_allocs_i2)) + self.assertEqual(cn1.uuid, before_allocs_i2[2].resource_provider.uuid) + + # Before we issue the actual reshape() call, we need to first create + # the child providers and sharing storage provider. These are actions + # that the virt driver or external agent is responsible for performing + # *before* attempting any reshape activity. + cn1_numa0 = self._create_provider('cn1_numa0', parent=cn1.uuid) + cn1_numa1 = self._create_provider('cn1_numa1', parent=cn1.uuid) + ss = self._create_provider('ss') + + # OK, now emulate the call to POST /reshaper that will be triggered by + # a virt driver wanting to replace the world and change its modeling + # from a single provider to a nested provider tree along with a sharing + # storage provider. + after_inventories = { + # cn1 keeps the RAM only + cn1.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1, + resource_class='MEMORY_MB', total=32768, reserved=0, + max_unit=32768, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + # each NUMA node gets half of the CPUs + cn1_numa0.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1_numa0, + resource_class='VCPU', total=8, reserved=0, + max_unit=8, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + cn1_numa1.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1_numa1, + resource_class='VCPU', total=8, reserved=0, + max_unit=8, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + # The sharing provider gets a bunch of disk + ss.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=ss, + resource_class='DISK_GB', total=100000, reserved=0, + max_unit=1000, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + } + # We do a fetch from the DB for each instance to get its latest + # generation. This would be done by the resource tracker or scheduler + # report client before issuing the call to reshape() because the + # consumers representing the two instances above will have had their + # generations incremented in the original call to PUT + # /allocations/{consumer_uuid} + i1_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i1_uuid) + i2_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i2_uuid) + after_allocs = rp_obj.AllocationList(self.ctx, objects=[ + # instance1 gets VCPU from NUMA0, MEMORY_MB from cn1 and DISK_GB + # from the sharing storage provider + rp_obj.Allocation( + self.ctx, resource_provider=cn1_numa0, resource_class='VCPU', + consumer=i1_consumer, used=2), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, resource_class='MEMORY_MB', + consumer=i1_consumer, used=1024), + rp_obj.Allocation( + self.ctx, resource_provider=ss, resource_class='DISK_GB', + consumer=i1_consumer, used=100), + # instance2 gets VCPU from NUMA1, MEMORY_MB from cn1 and DISK_GB + # from the sharing storage provider + rp_obj.Allocation( + self.ctx, resource_provider=cn1_numa1, resource_class='VCPU', + consumer=i2_consumer, used=2), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, resource_class='MEMORY_MB', + consumer=i2_consumer, used=1024), + rp_obj.Allocation( + self.ctx, resource_provider=ss, resource_class='DISK_GB', + consumer=i2_consumer, used=100), + ]) + rp_obj.reshape(self.ctx, after_inventories, after_allocs) + + # Verify that the inventories have been moved to the appropriate + # providers in the AFTER scenario + + # The root compute node should only have MEMORY_MB, nothing else + cn1_inv = rp_obj.InventoryList.get_all_by_resource_provider( + self.ctx, cn1) + self.assertEqual(1, len(cn1_inv)) + self.assertEqual('MEMORY_MB', cn1_inv[0].resource_class) + self.assertEqual(32768, cn1_inv[0].total) + # Each NUMA node should only have half the original VCPU, nothing else + numa0_inv = rp_obj.InventoryList.get_all_by_resource_provider( + self.ctx, cn1_numa0) + self.assertEqual(1, len(numa0_inv)) + self.assertEqual('VCPU', numa0_inv[0].resource_class) + self.assertEqual(8, numa0_inv[0].total) + numa1_inv = rp_obj.InventoryList.get_all_by_resource_provider( + self.ctx, cn1_numa1) + self.assertEqual(1, len(numa1_inv)) + self.assertEqual('VCPU', numa1_inv[0].resource_class) + self.assertEqual(8, numa1_inv[0].total) + # The sharing storage provider should only have DISK_GB, nothing else + ss_inv = rp_obj.InventoryList.get_all_by_resource_provider( + self.ctx, ss) + self.assertEqual(1, len(ss_inv)) + self.assertEqual('DISK_GB', ss_inv[0].resource_class) + self.assertEqual(100000, ss_inv[0].total) + + # Verify we have the allocations we expect for the AFTER scenario + after_allocs_i1 = rp_obj.AllocationList.get_all_by_consumer_id( + self.ctx, i1_uuid) + self.assertEqual(3, len(after_allocs_i1)) + # Our VCPU allocation should be in the NUMA0 node + vcpu_alloc = alloc_for_rc(after_allocs_i1, 'VCPU') + self.assertIsNotNone(vcpu_alloc) + self.assertEqual(cn1_numa0.uuid, vcpu_alloc.resource_provider.uuid) + # Our DISK_GB allocation should be in the sharing provider + disk_alloc = alloc_for_rc(after_allocs_i1, 'DISK_GB') + self.assertIsNotNone(disk_alloc) + self.assertEqual(ss.uuid, disk_alloc.resource_provider.uuid) + # And our MEMORY_MB should remain on the root compute node + ram_alloc = alloc_for_rc(after_allocs_i1, 'MEMORY_MB') + self.assertIsNotNone(ram_alloc) + self.assertEqual(cn1.uuid, ram_alloc.resource_provider.uuid) + + after_allocs_i2 = rp_obj.AllocationList.get_all_by_consumer_id( + self.ctx, i2_uuid) + self.assertEqual(3, len(after_allocs_i2)) + # Our VCPU allocation should be in the NUMA1 node + vcpu_alloc = alloc_for_rc(after_allocs_i2, 'VCPU') + self.assertIsNotNone(vcpu_alloc) + self.assertEqual(cn1_numa1.uuid, vcpu_alloc.resource_provider.uuid) + # Our DISK_GB allocation should be in the sharing provider + disk_alloc = alloc_for_rc(after_allocs_i2, 'DISK_GB') + self.assertIsNotNone(disk_alloc) + self.assertEqual(ss.uuid, disk_alloc.resource_provider.uuid) + # And our MEMORY_MB should remain on the root compute node + ram_alloc = alloc_for_rc(after_allocs_i2, 'MEMORY_MB') + self.assertIsNotNone(ram_alloc) + self.assertEqual(cn1.uuid, ram_alloc.resource_provider.uuid) + + def test_reshape_concurrent_inventory_update(self): + """Valid failure scenario for reshape(). We test a situation where the + virt driver has constructed it's "after inventories and allocations" + and sent those to the POST /reshape endpoint. The reshape POST handler + does a quick check of the resource provider generations sent in the + payload and they all check out. + + However, right before the call to resource_provider.reshape(), another + thread legitimately changes the inventory of one of the providers + involved in the reshape transaction. We should get a + ConcurrentUpdateDetected in this case. + """ + # First create our consumers + i1_uuid = uuids.instance1 + i1_consumer = consumer_obj.Consumer( + self.ctx, uuid=i1_uuid, user=self.user_obj, + project=self.project_obj) + i1_consumer.create() + + # then all our original providers + cn1 = self._create_provider('cn1') + tb.add_inventory(cn1, 'VCPU', 16) + tb.add_inventory(cn1, 'MEMORY_MB', 32768) + tb.add_inventory(cn1, 'DISK_GB', 1000) + + # Allocate an instance on our compute node + allocs = [ + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='VCPU', consumer=i1_consumer, used=2), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='MEMORY_MB', consumer=i1_consumer, used=1024), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, + resource_class='DISK_GB', consumer=i1_consumer, used=100), + ] + alloc_list = rp_obj.AllocationList(self.ctx, objects=allocs) + alloc_list.replace_all() + + # Before we issue the actual reshape() call, we need to first create + # the child providers and sharing storage provider. These are actions + # that the virt driver or external agent is responsible for performing + # *before* attempting any reshape activity. + cn1_numa0 = self._create_provider('cn1_numa0', parent=cn1.uuid) + cn1_numa1 = self._create_provider('cn1_numa1', parent=cn1.uuid) + ss = self._create_provider('ss') + + # OK, now emulate the call to POST /reshaper that will be triggered by + # a virt driver wanting to replace the world and change its modeling + # from a single provider to a nested provider tree along with a sharing + # storage provider. + after_inventories = { + # cn1 keeps the RAM only + cn1.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1, + resource_class='MEMORY_MB', total=32768, reserved=0, + max_unit=32768, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + # each NUMA node gets half of the CPUs + cn1_numa0.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1_numa0, + resource_class='VCPU', total=8, reserved=0, + max_unit=8, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + cn1_numa1.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=cn1_numa1, + resource_class='VCPU', total=8, reserved=0, + max_unit=8, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + # The sharing provider gets a bunch of disk + ss.uuid: rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=ss, + resource_class='DISK_GB', total=100000, reserved=0, + max_unit=1000, min_unit=1, step_size=1, + allocation_ratio=1.0), + ]), + } + # We do a fetch from the DB for each instance to get its latest + # generation. This would be done by the resource tracker or scheduler + # report client before issuing the call to reshape() because the + # consumers representing the two instances above will have had their + # generations incremented in the original call to PUT + # /allocations/{consumer_uuid} + i1_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i1_uuid) + after_allocs = rp_obj.AllocationList(self.ctx, objects=[ + # instance1 gets VCPU from NUMA0, MEMORY_MB from cn1 and DISK_GB + # from the sharing storage provider + rp_obj.Allocation( + self.ctx, resource_provider=cn1_numa0, resource_class='VCPU', + consumer=i1_consumer, used=2), + rp_obj.Allocation( + self.ctx, resource_provider=cn1, resource_class='MEMORY_MB', + consumer=i1_consumer, used=1024), + rp_obj.Allocation( + self.ctx, resource_provider=ss, resource_class='DISK_GB', + consumer=i1_consumer, used=100), + ]) + + # OK, now before we call reshape(), here we emulate another thread + # changing the inventory for the sharing storage provider in between + # the time in the REST handler when the sharing storage provider's + # generation was validated and the actual call to reshape() + ss_threadB = rp_obj.ResourceProvider.get_by_uuid(self.ctx, ss.uuid) + # Reduce the amount of storage to 2000, from 100000. + new_ss_inv = rp_obj.InventoryList(self.ctx, objects=[ + rp_obj.Inventory( + self.ctx, resource_provider=ss_threadB, + resource_class='DISK_GB', total=2000, reserved=0, + max_unit=1000, min_unit=1, step_size=1, + allocation_ratio=1.0)]) + ss_threadB.set_inventory(new_ss_inv) + # Double check our storage provider's generation is now greater than + # the original storage provider record being sent to reshape() + self.assertGreater(ss_threadB.generation, ss.generation) + + # And we should legitimately get a failure now to reshape() due to + # another thread updating one of the involved provider's generations + self.assertRaises( + exception.ConcurrentUpdateDetected, + rp_obj.reshape, self.ctx, after_inventories, after_allocs)