Merge "Add set_inventory() method on ResourceProvider"

This commit is contained in:
Jenkins 2016-06-16 21:09:44 +00:00 committed by Gerrit Code Review
commit fab0cba7cd
4 changed files with 242 additions and 1 deletions

View File

@ -2148,3 +2148,8 @@ class InvalidReservedMemoryPagesOption(Invalid):
msg_fmt = _("The format of the option 'reserved_huge_pages' is invalid. "
"(found '%(conf)s') Please refer to the nova "
"config-reference.")
class ConcurrentUpdateDetected(NovaException):
msg_fmt = _("Another thread concurrently updated the data. "
"Please retry your update")

View File

@ -11,6 +11,7 @@
# under the License.
import six
import sqlalchemy as sa
from sqlalchemy.orm import contains_eager
from nova.db.sqlalchemy import api as db_api
@ -20,6 +21,9 @@ from nova import objects
from nova.objects import base
from nova.objects import fields
_INV_TBL = models.Inventory.__table__
_RP_TBL = models.ResourceProvider.__table__
@db_api.api_context_manager.writer
def _create_rp_in_db(context, updates):
@ -46,6 +50,148 @@ def _get_rp_by_uuid_from_db(context, uuid):
return result
def _get_current_inventory_resources(conn, rp):
"""Returns a set() containing the resource class IDs for all resources
currently having an inventory record for the supplied resource provider.
:param conn: DB connection to use.
:param rp: Resource provider to query inventory for.
"""
cur_res_sel = sa.select([_INV_TBL.c.resource_class_id]).where(
_INV_TBL.c.resource_provider_id == rp.id)
existing_resources = conn.execute(cur_res_sel).fetchall()
return set([r[0] for r in existing_resources])
def _delete_inventory_from_provider(conn, rp, to_delete):
"""Deletes any inventory records from the supplied provider and set() of
resource class identifiers.
:param conn: DB connection to use.
:param rp: Resource provider from which to delete inventory.
:param to_delete: set() containing resource class IDs for records to
delete.
"""
del_stmt = _INV_TBL.delete().where(sa.and_(
_INV_TBL.c.resource_provider_id == rp.id,
_INV_TBL.c.resource_class_id.in_(to_delete)))
conn.execute(del_stmt)
def _add_inventory_to_provider(conn, rp, inv_list, to_add):
"""Inserts new inventory records for the supplied resource provider.
:param conn: DB connection to use.
:param rp: Resource provider to add inventory to.
:param inv_list: InventoryList object
:param to_add: set() containing resource class IDs to search inv_list for
adding to resource provider.
"""
for res_class in to_add:
inv_record = inv_list.find(res_class)
ins_stmt = _INV_TBL.insert().values(
resource_provider_id=rp.id,
resource_class_id=res_class,
total=inv_record.total,
reserved=inv_record.reserved,
min_unit=inv_record.min_unit,
max_unit=inv_record.max_unit,
step_size=inv_record.step_size,
allocation_ratio=inv_record.allocation_ratio)
conn.execute(ins_stmt)
def _update_inventory_for_provider(conn, rp, inv_list, to_update):
"""Updates existing inventory records for the supplied resource provider.
:param conn: DB connection to use.
:param rp: Resource provider to add inventory to.
:param inv_list: InventoryList object
:param to_update: set() containing resource class IDs to search inv_list
for updating in resource provider.
"""
for res_class in to_update:
inv_record = inv_list.find(res_class)
upd_stmt = _INV_TBL.update().where(sa.and_(
_INV_TBL.c.resource_provider_id == rp.id,
_INV_TBL.c.resource_class_id == res_class)).values(
total=inv_record.total,
reserved=inv_record.reserved,
min_unit=inv_record.min_unit,
max_unit=inv_record.max_unit,
step_size=inv_record.step_size,
allocation_ratio=inv_record.allocation_ratio)
conn.execute(upd_stmt)
def _increment_provider_generation(conn, rp):
"""Increments the supplied provider's generation value, supplying the
currently-known generation. Returns whether the increment succeeded.
:param conn: DB connection to use.
:param rp: `ResourceProvider` whose generation should be updated.
:returns True if the generation was incremented, False otherwise.
"""
rp_gen = rp.generation
new_generation = rp_gen + 1
upd_stmt = _RP_TBL.update().where(sa.and_(
_RP_TBL.c.id == rp.id,
_RP_TBL.c.generation == rp_gen)).values(
generation=(new_generation))
res = conn.execute(upd_stmt)
if res.rowcount != 1:
raise exception.ConcurrentUpdateDetected
return new_generation
@db_api.api_context_manager.writer
def _set_inventory(context, rp, inv_list):
"""Given an InventoryList object, replaces the inventory of the
resource provider in a safe, atomic fashion using the resource
provider's generation as a consistent view marker.
:param rp: `ResourceProvider` object upon which to set inventory.
:param inv_list: `InventoryList` object to save to backend storage.
:raises `ConcurrentUpdateDetected` if another thread updated the
same resource provider's view of its inventory or allocations
in between the time when this object was originally read
and the call to set the inventory.
"""
conn = context.session.connection()
existing_resources = _get_current_inventory_resources(conn, rp)
these_resources = set([fields.ResourceClass.index(r.resource_class)
for r in inv_list.objects])
# Determine which resources we should be adding, deleting and/or
# updating in the resource provider's inventory by comparing sets
# of resource class identifiers.
to_add = these_resources - existing_resources
to_delete = existing_resources - these_resources
to_update = these_resources & existing_resources
with conn.begin():
if to_delete:
_delete_inventory_from_provider(conn, rp, to_delete)
if to_add:
_add_inventory_to_provider(conn, rp, inv_list, to_add)
if to_update:
_update_inventory_for_provider(conn, rp, inv_list, to_update)
# Here is where we update the resource provider's generation value.
# If this update updates zero rows, that means that another
# thread has updated the inventory for this resource provider
# between the time the caller originally read the resource provider
# record and inventory information and this point. We raise an
# exception here which will rollback the above transaction and
# return an error to the caller to indicate that they can attempt
# to retry the inventory save after reverifying any capacity
# conditions and re-reading the existing inventory information.
rp.generation = _increment_provider_generation(conn, rp)
@base.NovaObjectRegistry.register
class ResourceProvider(base.NovaObject):
# Version 1.0: Initial version
@ -87,6 +233,11 @@ class ResourceProvider(base.NovaObject):
db_resource_provider = cls._get_by_uuid_from_db(context, uuid)
return cls._from_db_object(context, cls(), db_resource_provider)
@base.remotable
def set_inventory(self, inv_list):
_set_inventory(self._context, self, inv_list)
self.obj_reset_changes()
@staticmethod
def _create_in_db(context, updates):
return _create_rp_in_db(context, updates)

View File

@ -136,3 +136,88 @@ class ResourceProviderTestCase(test.NoDBTestCase):
objects.InventoryList.get_all_by_resource_provider_uuid(
self.context, resource_provider.uuid))
self.assertEqual(33, reloaded_inventories[0].total)
def test_provider_set_inventory(self):
rp = objects.ResourceProvider(context=self.context,
uuid=uuidsentinel.rp_uuid,
name=uuidsentinel.rp_name)
rp.create()
saved_generation = rp.generation
disk_inv = objects.Inventory(
resource_provider=rp,
resource_class=fields.ResourceClass.DISK_GB,
total=1024,
reserved=15,
min_unit=10,
max_unit=100,
step_size=10,
allocation_ratio=1.0)
vcpu_inv = objects.Inventory(
resource_provider=rp,
resource_class=fields.ResourceClass.VCPU,
total=12,
reserved=0,
min_unit=1,
max_unit=12,
step_size=1,
allocation_ratio=16.0)
# set to new list
inv_list = objects.InventoryList(objects=[disk_inv, vcpu_inv])
rp.set_inventory(inv_list)
# generation has bumped
self.assertEqual(saved_generation + 1, rp.generation)
saved_generation = rp.generation
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
self.context, uuidsentinel.rp_uuid)
self.assertEqual(2, len(new_inv_list))
resource_classes = [inv.resource_class for inv in new_inv_list]
self.assertIn(fields.ResourceClass.VCPU, resource_classes)
self.assertIn(fields.ResourceClass.DISK_GB, resource_classes)
# reset list to just disk_inv
inv_list = objects.InventoryList(objects=[disk_inv])
rp.set_inventory(inv_list)
# generation has bumped
self.assertEqual(saved_generation + 1, rp.generation)
saved_generation = rp.generation
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
self.context, uuidsentinel.rp_uuid)
self.assertEqual(1, len(new_inv_list))
resource_classes = [inv.resource_class for inv in new_inv_list]
self.assertNotIn(fields.ResourceClass.VCPU, resource_classes)
self.assertIn(fields.ResourceClass.DISK_GB, resource_classes)
self.assertEqual(1024, new_inv_list[0].total)
# update existing disk inv to new settings
disk_inv = objects.Inventory(
resource_provider=rp,
resource_class=fields.ResourceClass.DISK_GB,
total=2048,
reserved=15,
min_unit=10,
max_unit=100,
step_size=10,
allocation_ratio=1.0)
inv_list = objects.InventoryList(objects=[disk_inv])
rp.set_inventory(inv_list)
# generation has bumped
self.assertEqual(saved_generation + 1, rp.generation)
saved_generation = rp.generation
new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid(
self.context, uuidsentinel.rp_uuid)
self.assertEqual(1, len(new_inv_list))
self.assertEqual(2048, new_inv_list[0].total)
# fail when generation wrong
rp.generation = rp.generation - 1
self.assertRaises(exception.ConcurrentUpdateDetected,
rp.set_inventory, inv_list)

View File

@ -1180,7 +1180,7 @@ object_data = {
'Quotas': '1.2-1fe4cd50593aaf5d36a6dc5ab3f98fb3',
'QuotasNoOp': '1.2-e041ddeb7dc8188ca71706f78aad41c1',
'RequestSpec': '1.6-c1cb516acdf120d367a42d343ed695b5',
'ResourceProvider': '1.0-57f2a7e6fa50c6573af211521e83f8c7',
'ResourceProvider': '1.0-94e0e906feb26a24e217935c1e401467',
'S3ImageMapping': '1.0-7dd7366a890d82660ed121de9092276e',
'SchedulerLimits': '1.0-249c4bd8e62a9b327b7026b7f19cc641',
'SchedulerRetries': '1.1-3c9c8b16143ebbb6ad7030e999d14cc0',