Add set_inventory() method on ResourceProvider
Adds a set_inventory() method on the ResourceProvider object that replaces the resource provider's inventory with a supplied set of inventory records in an InventoryList object. The replacement is done in a consistent fashion using the resource provider's generation value to ensure that a writer doesn't update the inventory of a resource provider after another writer has changed the inventory in between the original read of inventory information and the subsequent save of that inventory. A functional test is in place which attempts to exercise all three of add, delete and update inventory as well as causing a ConcurrentUpdateException. The object hash has changed but no version increment done because the ResourceProvider object is not yet in use in the wild. These changes are still bringing the eventual first version to its correct form. Partially-Implements: blueprint generic-resource-pools Change-Id: I2214fa5d176269e65fdcd7a5a7e4958a925235d3
This commit is contained in:
@@ -2143,3 +2143,8 @@ class InvalidReservedMemoryPagesOption(Invalid):
|
||||
msg_fmt = _("The format of the option 'reserved_huge_pages' is invalid. "
|
||||
"(found '%(conf)s') Please refer to the nova "
|
||||
"config-reference.")
|
||||
|
||||
|
||||
class ConcurrentUpdateDetected(NovaException):
|
||||
msg_fmt = _("Another thread concurrently updated the data. "
|
||||
"Please retry your update")
|
||||
|
@@ -11,6 +11,7 @@
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.orm import contains_eager
|
||||
|
||||
from nova.db.sqlalchemy import api as db_api
|
||||
@@ -20,6 +21,9 @@ from nova import objects
|
||||
from nova.objects import base
|
||||
from nova.objects import fields
|
||||
|
||||
_INV_TBL = models.Inventory.__table__
|
||||
_RP_TBL = models.ResourceProvider.__table__
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _create_rp_in_db(context, updates):
|
||||
@@ -46,6 +50,148 @@ def _get_rp_by_uuid_from_db(context, uuid):
|
||||
return result
|
||||
|
||||
|
||||
def _get_current_inventory_resources(conn, rp):
|
||||
"""Returns a set() containing the resource class IDs for all resources
|
||||
currently having an inventory record for the supplied resource provider.
|
||||
|
||||
:param conn: DB connection to use.
|
||||
:param rp: Resource provider to query inventory for.
|
||||
"""
|
||||
cur_res_sel = sa.select([_INV_TBL.c.resource_class_id]).where(
|
||||
_INV_TBL.c.resource_provider_id == rp.id)
|
||||
existing_resources = conn.execute(cur_res_sel).fetchall()
|
||||
return set([r[0] for r in existing_resources])
|
||||
|
||||
|
||||
def _delete_inventory_from_provider(conn, rp, to_delete):
|
||||
"""Deletes any inventory records from the supplied provider and set() of
|
||||
resource class identifiers.
|
||||
|
||||
:param conn: DB connection to use.
|
||||
:param rp: Resource provider from which to delete inventory.
|
||||
:param to_delete: set() containing resource class IDs for records to
|
||||
delete.
|
||||
"""
|
||||
del_stmt = _INV_TBL.delete().where(sa.and_(
|
||||
_INV_TBL.c.resource_provider_id == rp.id,
|
||||
_INV_TBL.c.resource_class_id.in_(to_delete)))
|
||||
conn.execute(del_stmt)
|
||||
|
||||
|
||||
def _add_inventory_to_provider(conn, rp, inv_list, to_add):
|
||||
"""Inserts new inventory records for the supplied resource provider.
|
||||
|
||||
:param conn: DB connection to use.
|
||||
:param rp: Resource provider to add inventory to.
|
||||
:param inv_list: InventoryList object
|
||||
:param to_add: set() containing resource class IDs to search inv_list for
|
||||
adding to resource provider.
|
||||
"""
|
||||
for res_class in to_add:
|
||||
inv_record = inv_list.find(res_class)
|
||||
ins_stmt = _INV_TBL.insert().values(
|
||||
resource_provider_id=rp.id,
|
||||
resource_class_id=res_class,
|
||||
total=inv_record.total,
|
||||
reserved=inv_record.reserved,
|
||||
min_unit=inv_record.min_unit,
|
||||
max_unit=inv_record.max_unit,
|
||||
step_size=inv_record.step_size,
|
||||
allocation_ratio=inv_record.allocation_ratio)
|
||||
conn.execute(ins_stmt)
|
||||
|
||||
|
||||
def _update_inventory_for_provider(conn, rp, inv_list, to_update):
|
||||
"""Updates existing inventory records for the supplied resource provider.
|
||||
|
||||
:param conn: DB connection to use.
|
||||
:param rp: Resource provider to add inventory to.
|
||||
:param inv_list: InventoryList object
|
||||
:param to_update: set() containing resource class IDs to search inv_list
|
||||
for updating in resource provider.
|
||||
"""
|
||||
for res_class in to_update:
|
||||
inv_record = inv_list.find(res_class)
|
||||
upd_stmt = _INV_TBL.update().where(sa.and_(
|
||||
_INV_TBL.c.resource_provider_id == rp.id,
|
||||
_INV_TBL.c.resource_class_id == res_class)).values(
|
||||
total=inv_record.total,
|
||||
reserved=inv_record.reserved,
|
||||
min_unit=inv_record.min_unit,
|
||||
max_unit=inv_record.max_unit,
|
||||
step_size=inv_record.step_size,
|
||||
allocation_ratio=inv_record.allocation_ratio)
|
||||
conn.execute(upd_stmt)
|
||||
|
||||
|
||||
def _increment_provider_generation(conn, rp):
|
||||
"""Increments the supplied provider's generation value, supplying the
|
||||
currently-known generation. Returns whether the increment succeeded.
|
||||
|
||||
:param conn: DB connection to use.
|
||||
:param rp: `ResourceProvider` whose generation should be updated.
|
||||
:returns True if the generation was incremented, False otherwise.
|
||||
"""
|
||||
rp_gen = rp.generation
|
||||
new_generation = rp_gen + 1
|
||||
upd_stmt = _RP_TBL.update().where(sa.and_(
|
||||
_RP_TBL.c.id == rp.id,
|
||||
_RP_TBL.c.generation == rp_gen)).values(
|
||||
generation=(new_generation))
|
||||
|
||||
res = conn.execute(upd_stmt)
|
||||
if res.rowcount != 1:
|
||||
raise exception.ConcurrentUpdateDetected
|
||||
return new_generation
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _set_inventory(context, rp, inv_list):
|
||||
"""Given an InventoryList object, replaces the inventory of the
|
||||
resource provider in a safe, atomic fashion using the resource
|
||||
provider's generation as a consistent view marker.
|
||||
|
||||
:param rp: `ResourceProvider` object upon which to set inventory.
|
||||
:param inv_list: `InventoryList` object to save to backend storage.
|
||||
:raises `ConcurrentUpdateDetected` if another thread updated the
|
||||
same resource provider's view of its inventory or allocations
|
||||
in between the time when this object was originally read
|
||||
and the call to set the inventory.
|
||||
"""
|
||||
|
||||
conn = context.session.connection()
|
||||
|
||||
existing_resources = _get_current_inventory_resources(conn, rp)
|
||||
these_resources = set([fields.ResourceClass.index(r.resource_class)
|
||||
for r in inv_list.objects])
|
||||
|
||||
# Determine which resources we should be adding, deleting and/or
|
||||
# updating in the resource provider's inventory by comparing sets
|
||||
# of resource class identifiers.
|
||||
to_add = these_resources - existing_resources
|
||||
to_delete = existing_resources - these_resources
|
||||
to_update = these_resources & existing_resources
|
||||
|
||||
with conn.begin():
|
||||
if to_delete:
|
||||
_delete_inventory_from_provider(conn, rp, to_delete)
|
||||
if to_add:
|
||||
_add_inventory_to_provider(conn, rp, inv_list, to_add)
|
||||
if to_update:
|
||||
_update_inventory_for_provider(conn, rp, inv_list, to_update)
|
||||
|
||||
# Here is where we update the resource provider's generation value.
|
||||
# If this update updates zero rows, that means that another
|
||||
# thread has updated the inventory for this resource provider
|
||||
# between the time the caller originally read the resource provider
|
||||
# record and inventory information and this point. We raise an
|
||||
# exception here which will rollback the above transaction and
|
||||
# return an error to the caller to indicate that they can attempt
|
||||
# to retry the inventory save after reverifying any capacity
|
||||
# conditions and re-reading the existing inventory information.
|
||||
rp.generation = _increment_provider_generation(conn, rp)
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class ResourceProvider(base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
@@ -87,6 +233,11 @@ class ResourceProvider(base.NovaObject):
|
||||
db_resource_provider = cls._get_by_uuid_from_db(context, uuid)
|
||||
return cls._from_db_object(context, cls(), db_resource_provider)
|
||||
|
||||
@base.remotable
|
||||
def set_inventory(self, inv_list):
|
||||
_set_inventory(self._context, self, inv_list)
|
||||
self.obj_reset_changes()
|
||||
|
||||
@staticmethod
|
||||
def _create_in_db(context, updates):
|
||||
return _create_rp_in_db(context, updates)
|
||||
|
@@ -136,3 +136,88 @@ class ResourceProviderTestCase(test.NoDBTestCase):
|
||||
objects.InventoryList.get_all_by_resource_provider_uuid(
|
||||
self.context, resource_provider.uuid))
|
||||
self.assertEqual(33, reloaded_inventories[0].total)
|
||||
|
||||
def test_provider_set_inventory(self):
|
||||
rp = objects.ResourceProvider(context=self.context,
|
||||
uuid=uuidsentinel.rp_uuid,
|
||||
name=uuidsentinel.rp_name)
|
||||
rp.create()
|
||||
saved_generation = rp.generation
|
||||
|
||||
disk_inv = objects.Inventory(
|
||||
resource_provider=rp,
|
||||
resource_class=fields.ResourceClass.DISK_GB,
|
||||
total=1024,
|
||||
reserved=15,
|
||||
min_unit=10,
|
||||
max_unit=100,
|
||||
step_size=10,
|
||||
allocation_ratio=1.0)
|
||||
|
||||
vcpu_inv = objects.Inventory(
|
||||
resource_provider=rp,
|
||||
resource_class=fields.ResourceClass.VCPU,
|
||||
total=12,
|
||||
reserved=0,
|
||||
min_unit=1,
|
||||
max_unit=12,
|
||||
step_size=1,
|
||||
allocation_ratio=16.0)
|
||||
|
||||
# set to new list
|
||||
inv_list = objects.InventoryList(objects=[disk_inv, vcpu_inv])
|
||||
rp.set_inventory(inv_list)
|
||||
|
||||
# generation has bumped
|
||||
self.assertEqual(saved_generation + 1, rp.generation)
|
||||
saved_generation = rp.generation
|
||||
|
||||
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
|
||||
self.context, uuidsentinel.rp_uuid)
|
||||
self.assertEqual(2, len(new_inv_list))
|
||||
resource_classes = [inv.resource_class for inv in new_inv_list]
|
||||
self.assertIn(fields.ResourceClass.VCPU, resource_classes)
|
||||
self.assertIn(fields.ResourceClass.DISK_GB, resource_classes)
|
||||
|
||||
# reset list to just disk_inv
|
||||
inv_list = objects.InventoryList(objects=[disk_inv])
|
||||
rp.set_inventory(inv_list)
|
||||
|
||||
# generation has bumped
|
||||
self.assertEqual(saved_generation + 1, rp.generation)
|
||||
saved_generation = rp.generation
|
||||
|
||||
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
|
||||
self.context, uuidsentinel.rp_uuid)
|
||||
self.assertEqual(1, len(new_inv_list))
|
||||
resource_classes = [inv.resource_class for inv in new_inv_list]
|
||||
self.assertNotIn(fields.ResourceClass.VCPU, resource_classes)
|
||||
self.assertIn(fields.ResourceClass.DISK_GB, resource_classes)
|
||||
self.assertEqual(1024, new_inv_list[0].total)
|
||||
|
||||
# update existing disk inv to new settings
|
||||
disk_inv = objects.Inventory(
|
||||
resource_provider=rp,
|
||||
resource_class=fields.ResourceClass.DISK_GB,
|
||||
total=2048,
|
||||
reserved=15,
|
||||
min_unit=10,
|
||||
max_unit=100,
|
||||
step_size=10,
|
||||
allocation_ratio=1.0)
|
||||
inv_list = objects.InventoryList(objects=[disk_inv])
|
||||
rp.set_inventory(inv_list)
|
||||
|
||||
# generation has bumped
|
||||
self.assertEqual(saved_generation + 1, rp.generation)
|
||||
saved_generation = rp.generation
|
||||
|
||||
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
|
||||
self.context, uuidsentinel.rp_uuid)
|
||||
self.assertEqual(1, len(new_inv_list))
|
||||
self.assertEqual(2048, new_inv_list[0].total)
|
||||
|
||||
# fail when generation wrong
|
||||
rp.generation = rp.generation - 1
|
||||
self.assertRaises(exception.ConcurrentUpdateDetected,
|
||||
rp.set_inventory, inv_list)
|
||||
|
@@ -1180,7 +1180,7 @@ object_data = {
|
||||
'Quotas': '1.2-1fe4cd50593aaf5d36a6dc5ab3f98fb3',
|
||||
'QuotasNoOp': '1.2-e041ddeb7dc8188ca71706f78aad41c1',
|
||||
'RequestSpec': '1.6-c1cb516acdf120d367a42d343ed695b5',
|
||||
'ResourceProvider': '1.0-57f2a7e6fa50c6573af211521e83f8c7',
|
||||
'ResourceProvider': '1.0-94e0e906feb26a24e217935c1e401467',
|
||||
'S3ImageMapping': '1.0-7dd7366a890d82660ed121de9092276e',
|
||||
'SchedulerLimits': '1.0-249c4bd8e62a9b327b7026b7f19cc641',
|
||||
'SchedulerRetries': '1.1-3c9c8b16143ebbb6ad7030e999d14cc0',
|
||||
|
Reference in New Issue
Block a user