Narrow scope of set allocations database transaction

When consumer types were added [1], the set allocations code path was
modified to perform util.ensure_consumer() and
AllocationList.replace_all() in a single database transaction in order
to provide automatic rollbacks of consumer type changes by racing
requests.

This proved to be problematic because util.ensure_consumer() contains
logic to "get or create" database records for project, user, consumer
type, and consumer. By definition, the "get or create" pattern cannot
reside in the same database transaction else a DuplicateEntry database
error followed by an attempt to "get" the existing record will fail as
the single database transaction is being rolled back and no new
statements can be executed.

Because of this, we cannot wrap util.ensure_consumer() under the same
database transaction as Allocation.replace_all() and we need to reduce
the scope of the database transaction to cover only the updates to a
consumer (if the requested project/user/consumer type differ from what
was found in the database) and the write of the allocation records.

This changes things around to stop performing "difference" updates
inside of the util.ensure_consumer() method and instead return what was
requested as an additional return value. Then, we move the database
transaction to cover only util.update_consumers() and
AllocationList.replace_all() so that if replace_all() fails due to a
generation conflict, the consumer updates will also be rolled back.

This approach was chosen in an effort to introduce the least amount of
change to the existing understood code.

Story: 2009159
Task: 43143

[1] I24c2315093e07dbf25c4fb53152e6a4de7477a51

Change-Id: I887606047ac1da3f2c59c17988ddcc17bd01ef43
This commit is contained in:
melanie witt 2021-09-02 00:50:19 +00:00
parent 7d90341255
commit d90f16720b
6 changed files with 193 additions and 115 deletions

View File

@ -218,33 +218,43 @@ def inspect_consumers(context, data, want_version):
:param context: The placement context.
:param data: A dictionary of multiple allocations by consumer uuid.
:param want_version: the microversion matcher.
:return: A tuple of a dict of all consumer objects (by consumer uuid)
and a list of those consumer objects which are new.
:return: A 3-tuple of (a dict of all consumer objects (by consumer uuid),
a list of those consumer objects which are new,
a dict of RequestAttr objects (by consumer_uuid))
"""
# First, ensure that all consumers referenced in the payload actually
# exist. And if not, create them. Keep a record of auto-created consumers
# so we can clean them up if the end allocation replace_all() fails.
consumers = {} # dict of Consumer objects, keyed by consumer UUID
new_consumers_created = []
# Save requested attributes in order to do an update later in the same
# database transaction as AllocationList.replace_all() so that rollbacks
# can happen properly. Consumer table updates are guarded by the
# generation, so we can't necessarily save all of the original attribute
# values and write them back into the table in the event of an exception.
# If the generation doesn't match, Consumer.update() is a no-op.
requested_attrs = {}
for consumer_uuid in data:
project_id = data[consumer_uuid]['project_id']
user_id = data[consumer_uuid]['user_id']
consumer_generation = data[consumer_uuid].get('consumer_generation')
consumer_type = data[consumer_uuid].get('consumer_type')
try:
consumer, new_consumer_created = data_util.ensure_consumer(
context, consumer_uuid, project_id, user_id,
consumer_generation, consumer_type, want_version)
consumer, new_consumer_created, request_attr = (
data_util.ensure_consumer(
context, consumer_uuid, project_id,
user_id, consumer_generation, consumer_type, want_version))
if new_consumer_created:
new_consumers_created.append(consumer)
consumers[consumer_uuid] = consumer
requested_attrs[consumer_uuid] = request_attr
except Exception:
# If any errors (for instance, a consumer generation conflict)
# occur when ensuring consumer records above, make sure we delete
# any auto-created consumers.
with excutils.save_and_reraise_exception():
delete_consumers(new_consumers_created)
return consumers, new_consumers_created
return consumers, new_consumers_created, requested_attrs
@wsgi_wrapper.PlacementWsgify
@ -402,39 +412,28 @@ def _set_allocations_for_consumer(req, schema):
}
allocation_data = allocations_dict
# NOTE(melwitt): Group all of the database updates in a single transaction
# so that updates get rolled back automatically in the event of a consumer
# generation conflict.
_set_allocations_for_consumer_same_transaction(
context, consumer_uuid, data, allocation_data, want_version)
req.response.status = 204
req.response.content_type = None
return req.response
@db_api.placement_context_manager.writer
def _set_allocations_for_consumer_same_transaction(context, consumer_uuid,
data, allocation_data,
want_version):
allocation_objects = []
# Consumer object saved in case we need to delete the auto-created consumer
# record
consumer = None
# Whether we created a new consumer record
created_new_consumer = False
if not allocation_data:
# The allocations are empty, which means wipe them out. Internal
# to the allocation object this is signalled by a used value of 0.
# We still need to verify the consumer's generation, though, which
# we do in _ensure_consumer()
# NOTE(jaypipes): This will only occur 1.28+. The JSONSchema will
# prevent an empty allocations object from being passed when there is
# no consumer generation, so this is safe to do.
# Get or create the project, user, consumer, and consumer type.
# This needs to be done in separate database transactions so that the
# records can be read after a create collision due to a racing request.
consumer, created_new_consumer, request_attr = (
data_util.ensure_consumer(
context, consumer_uuid, data.get('project_id'),
data.get('user_id'), data.get('consumer_generation'),
data.get('consumer_type'), want_version)
data.get('consumer_type'), want_version))
if not allocation_data:
# The allocations are empty, which means wipe them out. Internal
# to the allocation object this is signalled by a used value of 0.
# We verified the consumer's generation in util.ensure_consumer()
# NOTE(jaypipes): This will only occur 1.28+. The JSONSchema will
# prevent an empty allocations object from being passed when there is
# no consumer generation, so this is safe to do.
allocations = alloc_obj.get_all_by_consumer_id(context, consumer_uuid)
for allocation in allocations:
allocation.used = 0
@ -443,10 +442,7 @@ def _set_allocations_for_consumer_same_transaction(context, consumer_uuid,
# If the body includes an allocation for a resource provider
# that does not exist, raise a 400.
rp_objs = _resource_providers_by_uuid(context, allocation_data.keys())
consumer, created_new_consumer = data_util.ensure_consumer(
context, consumer_uuid, data.get('project_id'),
data.get('user_id'), data.get('consumer_generation'),
data.get('consumer_type'), want_version)
for resource_provider_uuid, allocation in allocation_data.items():
resource_provider = rp_objs[resource_provider_uuid]
new_allocations = _new_allocations(context,
@ -455,17 +451,29 @@ def _set_allocations_for_consumer_same_transaction(context, consumer_uuid,
allocation['resources'])
allocation_objects.extend(new_allocations)
def _create_allocations(alloc_list):
@db_api.placement_context_manager.writer
def _update_consumers_and_create_allocations(ctx):
# Update consumer attributes if requested attributes are different.
# NOTE(melwitt): This will not raise ConcurrentUpdateDetected, that
# happens later in AllocationList.replace_all()
data_util.update_consumers([consumer], {consumer_uuid: request_attr})
alloc_obj.replace_all(ctx, allocation_objects)
LOG.debug("Successfully wrote allocations %s", allocation_objects)
def _create_allocations():
try:
alloc_obj.replace_all(context, alloc_list)
LOG.debug("Successfully wrote allocations %s", alloc_list)
# NOTE(melwitt): Group the consumer and allocation database updates
# in a single transaction so that updates get rolled back
# automatically in the event of a consumer generation conflict.
_update_consumers_and_create_allocations(context)
except Exception:
with excutils.save_and_reraise_exception():
if created_new_consumer:
delete_consumers([consumer])
try:
_create_allocations(allocation_objects)
_create_allocations()
# InvalidInventory is a parent for several exceptions that
# indicate either that Inventory is not present, or that
# capacity limits have been exceeded.
@ -482,6 +490,10 @@ def _set_allocations_for_consumer_same_transaction(context, consumer_uuid,
'allocate: %(error)s' % {'error': exc},
comment=errors.CONCURRENT_UPDATE)
req.response.status = 204
req.response.content_type = None
return req.response
@wsgi_wrapper.PlacementWsgify
@microversion.version_handler('1.0', '1.7')
@ -541,24 +553,36 @@ def set_allocations(req):
want_schema = schema.POST_ALLOCATIONS_V1_38
data = util.extract_json(req.body, want_schema)
consumers, new_consumers_created = inspect_consumers(
consumers, new_consumers_created, requested_attrs = inspect_consumers(
context, data, want_version)
# Create a sequence of allocation objects to be used in one
# alloc_obj.replace_all() call, which will mean all the changes
# happen within a single transaction and with resource provider
# and consumer generations (if applicable) check all in one go.
# alloc_obj.replace_all() call, which will mean all the changes happen
# within a single transaction and with resource provider and consumer
# generations (if applicable) check all in one go.
allocations = create_allocation_list(context, data, consumers)
def _create_allocations(alloc_list):
@db_api.placement_context_manager.writer
def _update_consumers_and_create_allocations(ctx):
# Update consumer attributes if requested attributes are different.
# NOTE(melwitt): This will not raise ConcurrentUpdateDetected, that
# happens later in AllocationList.replace_all()
data_util.update_consumers(consumers.values(), requested_attrs)
alloc_obj.replace_all(ctx, allocations)
LOG.debug("Successfully wrote allocations %s", allocations)
def _create_allocations():
try:
alloc_obj.replace_all(context, alloc_list)
LOG.debug("Successfully wrote allocations %s", alloc_list)
# NOTE(melwitt): Group the consumer and allocation database updates
# in a single transaction so that updates get rolled back
# automatically in the event of a consumer generation conflict.
_update_consumers_and_create_allocations(context)
except Exception:
with excutils.save_and_reraise_exception():
delete_consumers(new_consumers_created)
try:
_create_allocations(allocations)
_create_allocations()
except exception.NotFound as exc:
raise webob.exc.HTTPBadRequest(
"Unable to allocate inventory %(error)s" % {'error': exc})

View File

@ -22,12 +22,14 @@ import copy
from oslo_utils import excutils
import webob
from placement import db_api
from placement import errors
from placement import exception
# TODO(cdent): That we are doing this suggests that there's stuff to be
# extracted from the handler to a shared module.
from placement.handlers import allocation
from placement.handlers import inventory
from placement.handlers import util as data_util
from placement import microversion
from placement.objects import reshaper
from placement.objects import resource_provider as rp_obj
@ -90,25 +92,38 @@ def reshape(req):
inventory_by_rp[resource_provider] = inv_list
# Make the consumer objects associated with the allocations.
consumers, new_consumers_created = allocation.inspect_consumers(
context, allocations, want_version)
consumers, new_consumers_created, requested_attrs = (
allocation.inspect_consumers(context, allocations, want_version))
# Nest exception handling so that any exception results in new consumer
# objects being deleted, then reraise for translating to HTTP exceptions.
try:
# When these allocations are created they get resource provider objects
# which are different instances (usually with the same data) from those
# loaded above when creating inventory objects. The reshape method below
# is responsible for ensuring that the resource providers and their
# generations do not conflict.
allocation_objects = allocation.create_allocation_list(
context, allocations, consumers)
@db_api.placement_context_manager.writer
def _update_consumers_and_create_allocations(ctx):
# Update consumer attributes if requested attributes are different.
# NOTE(melwitt): This will not raise ConcurrentUpdateDetected, that
# happens later in AllocationList.replace_all()
data_util.update_consumers(consumers.values(), requested_attrs)
reshaper.reshape(ctx, inventory_by_rp, allocation_objects)
def _create_allocations():
try:
# When these allocations are created they get resource provider
# objects which are different instances (usually with the same
# data) from those loaded above when creating inventory objects.
# The reshape method below is responsible for ensuring that the
# resource providers and their generations do not conflict.
allocation_objects = allocation.create_allocation_list(
context, allocations, consumers)
reshaper.reshape(context, inventory_by_rp, allocation_objects)
# NOTE(melwitt): Group the consumer and allocation database updates
# in a single transaction so that updates get rolled back
# automatically in the event of a consumer generation conflict.
_update_consumers_and_create_allocations(context)
except Exception:
with excutils.save_and_reraise_exception():
allocation.delete_consumers(new_consumers_created)
try:
_create_allocations()
# Generation conflict is a (rare) possibility in a few different
# places in reshape().
except exception.ConcurrentUpdateDetected as exc:

View File

@ -11,6 +11,8 @@
# under the License.
"""DB Utility methods for placement."""
import collections
from oslo_log import log as logging
import webob
@ -24,6 +26,9 @@ from placement.objects import user as user_obj
LOG = logging.getLogger(__name__)
RequestAttr = collections.namedtuple('RequestAttr',
['project', 'user', 'consumer_type_id'])
def get_or_create_consumer_type_id(ctx, name):
"""Tries to fetch the provided consumer_type and creates a new one if it
@ -100,14 +105,13 @@ def ensure_consumer(ctx, consumer_uuid, project_id, user_id,
"""Ensures there are records in the consumers, projects and users table for
the supplied external identifiers.
Returns a tuple containing the populated Consumer object containing Project
and User sub-objects and a boolean indicating whether a new Consumer object
was created (as opposed to an existing consumer record retrieved)
:note: If the supplied project or user external identifiers do not match an
existing consumer's project and user identifiers, the existing
consumer's project and user IDs are updated to reflect the supplied
ones.
Returns a 3-tuple containing:
- the populated Consumer object containing Project and User sub-objects
- a boolean indicating whether a new Consumer object was created
(as opposed to an existing consumer record retrieved)
- a dict of RequestAttr objects by consumer_uuid which contains the
requested Project, User, and consumer type ID (which may be different
than what is contained in an existing consumer record retrieved)
:param ctx: The request context.
:param consumer_uuid: The uuid of the consumer of the resources.
@ -129,6 +133,8 @@ def ensure_consumer(ctx, consumer_uuid, project_id, user_id,
proj = _get_or_create_project(ctx, project_id)
user = _get_or_create_user(ctx, user_id)
cons_type_id = None
try:
consumer = consumer_obj.Consumer.get_by_uuid(ctx, consumer_uuid)
if requires_consumer_generation:
@ -141,6 +147,57 @@ def ensure_consumer(ctx, consumer_uuid, project_id, user_id,
'got_gen': consumer_generation,
},
comment=errors.CONCURRENT_UPDATE)
if requires_consumer_type:
cons_type_id = get_or_create_consumer_type_id(ctx, consumer_type)
except exception.NotFound:
# If we are attempting to modify or create allocations after 1.26, we
# need a consumer generation specified. The user must have specified
# None for the consumer generation if we get here, since there was no
# existing consumer with this UUID and therefore the user should be
# indicating that they expect the consumer did not exist.
if requires_consumer_generation:
if consumer_generation is not None:
raise webob.exc.HTTPConflict(
'consumer generation conflict - '
'expected null but got %s' % consumer_generation,
comment=errors.CONCURRENT_UPDATE)
if requires_consumer_type:
cons_type_id = get_or_create_consumer_type_id(ctx, consumer_type)
# No such consumer. This is common for new allocations. Create the
# consumer record
consumer, created_new_consumer = _create_consumer(
ctx, consumer_uuid, proj, user, cons_type_id)
# Also return the project, user, and consumer type from the request to use
# for rollbacks.
request_attr = RequestAttr(proj, user, cons_type_id)
return consumer, created_new_consumer, request_attr
def update_consumers(consumers, request_attrs):
"""Update consumers with the requested Project, User, and consumer type ID
if they are different.
If the supplied project or user external identifiers do not match an
existing consumer's project and user identifiers, the existing consumer's
project and user IDs are updated to reflect the supplied ones.
If the supplied consumer types do not match an existing consumer's consumer
type, the existing consumer's consumer types are updated to reflect the
supplied ones.
:param consumers: a list of Consumer objects
:param request_attrs: a dict of RequestAttr objects by consumer_uuid
"""
for consumer in consumers:
request_attr = request_attrs[consumer.uuid]
project = request_attr.project
user = request_attr.user
# Note: this can be None if the request microversion is < 1.38.
consumer_type_id = request_attr.consumer_type_id
# NOTE(jaypipes): The user may have specified a different project and
# user external ID than the one that we had for the consumer. If this
# is the case, go ahead and modify the consumer record with the
@ -158,46 +215,26 @@ def ensure_consumer(ctx, consumer_uuid, project_id, user_id,
# resource provider generation conflict or out of resources failure),
# we will end up deleting the auto-created consumer and we will undo
# the changes to the second consumer's project and user ID.
#
# NOTE(melwitt): The aforementioned rollback of changes is predicated
# on the fact that the same transaction context is used for both
# util.ensure_consumer() and AllocationList.replace_all() within the
# util.update_consumers() and AllocationList.replace_all() within the
# same HTTP request. The @db_api.placement_context_manager.writer
# decorator on the outermost method will nest to methods called within
# the outermost method.
if (project_id != consumer.project.external_id or
user_id != consumer.user.external_id):
if (project.external_id != consumer.project.external_id or
user.external_id != consumer.user.external_id):
LOG.debug("Supplied project or user ID for consumer %s was "
"different than existing record. Updating consumer "
"record.", consumer_uuid)
consumer.project = proj
"record.", consumer.uuid)
consumer.project = project
consumer.user = user
consumer.update()
# Update the consumer type if it's different than the existing one.
if requires_consumer_type:
cons_type_id = get_or_create_consumer_type_id(ctx, consumer_type)
if cons_type_id != consumer.consumer_type_id:
LOG.debug("Supplied consumer type for consumer %s was "
"different than existing record. Updating "
"consumer record.", consumer_uuid)
consumer.consumer_type_id = cons_type_id
consumer.update()
except exception.NotFound:
# If we are attempting to modify or create allocations after 1.26, we
# need a consumer generation specified. The user must have specified
# None for the consumer generation if we get here, since there was no
# existing consumer with this UUID and therefore the user should be
# indicating that they expect the consumer did not exist.
if requires_consumer_generation:
if consumer_generation is not None:
raise webob.exc.HTTPConflict(
'consumer generation conflict - '
'expected null but got %s' % consumer_generation,
comment=errors.CONCURRENT_UPDATE)
cons_type_id = (get_or_create_consumer_type_id(ctx, consumer_type)
if requires_consumer_type else None)
# No such consumer. This is common for new allocations. Create the
# consumer record
consumer, created_new_consumer = _create_consumer(
ctx, consumer_uuid, proj, user, cons_type_id)
return consumer, created_new_consumer
# Update the consumer type if it's different than the existing one.
if consumer_type_id and consumer_type_id != consumer.consumer_type_id:
LOG.debug("Supplied consumer type for consumer %s was "
"different than existing record. Updating "
"consumer record.", consumer.uuid)
consumer.consumer_type_id = consumer_type_id
consumer.update()

View File

@ -511,8 +511,17 @@ def replace_all(context, alloc_list):
alloc.resource_provider.uuid for alloc in alloc_list)
seen_rps = {}
for rp_uuid in alloc_rp_uuids:
seen_rps[rp_uuid] = rp_obj.ResourceProvider.get_by_uuid(
context, rp_uuid)
# NOTE(melwitt): We use a separate database transaction to read
# the resource provider because we might be wrapped in an outer
# database transaction when we reach here. We want to get an
# up-to-date generation value in case a racing request has
# changed it after we began an outer transaction and this is
# the first time we are reading the resource provider records
# during our transaction.
db_context_manager = db_api.placement_context_manager
with db_context_manager.reader.independent.using(context):
seen_rps[rp_uuid] = rp_obj.ResourceProvider.get_by_uuid(
context, rp_uuid)
for alloc in alloc_list:
rp_uuid = alloc.resource_provider.uuid
alloc.resource_provider = seen_rps[rp_uuid]

View File

@ -103,19 +103,10 @@ class TestAllocationProjectCreateRace(base.TestCase):
url = '/allocations/%s' % uuids.consumer
resp = client.put(url, data=alloc_data, headers=self.headers)
# FIXME(gibi) this is bug
# https://storyboard.openstack.org/#!/story/2009159 The expected
# behavior would be that the allocation update succeeds as the
# transaction can fetch the Project created by a racing transaction
#
# self.assertEqual(204, resp.status_code)
self.assertEqual(500, resp.status_code)
self.assertIn(
"This session is in 'inactive' state, due to the SQL "
"transaction being rolled back; no further SQL can be emitted "
"within this transaction.",
resp.json()['errors'][0]['detail']
)
self.assertEqual(204, resp.status_code)
def test_set_allocations(self):
alloc_data = jsonutils.dump_as_bytes({

View File

@ -243,9 +243,10 @@ class TestEnsureConsumer(base.ContextTestCase):
self.mock_consumer_get.return_value = consumer
consumer_gen = 1
util.ensure_consumer(
consumer, created_new_consumer, request_attr = util.ensure_consumer(
self.ctx, self.consumer_id, self.project_id, self.user_id,
consumer_gen, 'TYPE', self.cons_type_req_version)
util.update_consumers([consumer], {consumer.uuid: request_attr})
# Expect 1 call to update() to update to the supplied consumer type ID
self.mock_consumer_update.assert_called_once_with()
# Consumer should have the new consumer type from the cache
@ -272,9 +273,10 @@ class TestEnsureConsumer(base.ContextTestCase):
exception.ConsumerExists(uuid=uuidsentinel.consumer))
consumer_gen = 1
util.ensure_consumer(
consumer, created_new_consumer, request_attr = util.ensure_consumer(
self.ctx, self.consumer_id, self.project_id, self.user_id,
consumer_gen, 'TYPE', self.cons_type_req_version)
util.update_consumers([consumer], {consumer.uuid: request_attr})
# Expect 1 call to update() to update to the supplied consumer type ID
self.mock_consumer_update.assert_called_once_with()
# Consumer should have the new consumer type from the cache