Run revision bump operations en masse

Refactored RevisionPlugin to operate upon sets of objects at
once.   In the case of "related" objects, the bump operation
no longer makes use of compare-and-swap and instead updates
version numbers directly without testing for their previous value.
This removes the issue of StaleDataErrors being
prevalent within the related object update phase, given the assumption
that the compare-and-swap logic is only desireable for the primary object
being updated.

Change-Id: I2fef298041c59a03dfd06912764973995b80690c
This commit is contained in:
Mike Bayer 2019-02-07 16:59:51 -05:00
parent 9a15084375
commit d841ce72bf
3 changed files with 173 additions and 29 deletions

View File

@ -19,6 +19,7 @@ import sqlalchemy as sa
from sqlalchemy import event # noqa
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from sqlalchemy.orm import session as se
from neutron._i18n import _
@ -76,6 +77,16 @@ class StandardAttribute(model_base.BASEV2):
return
self.revision_number += 1
def _set_updated_revision_number(self, revision_number, updated_at):
attributes.set_committed_value(
self, "revision_number", revision_number)
attributes.set_committed_value(
self, "updated_at", updated_at)
@property
def _effective_standard_attribute_id(self):
return self.id
class HasStandardAttributes(object):
@ -146,6 +157,10 @@ class HasStandardAttributes(object):
single_parent=True,
uselist=False)
@property
def _effective_standard_attribute_id(self):
return self.standard_attr_id
def __init__(self, *args, **kwargs):
standard_attr_keys = ['description', 'created_at',
'updated_at', 'revision_number']
@ -188,6 +203,10 @@ class HasStandardAttributes(object):
# modified (e.g. fixed_ips change should bump port revision)
self.standard_attr.bump_revision()
def _set_updated_revision_number(self, revision_number, updated_at):
self.standard_attr._set_updated_revision_number(
revision_number, updated_at)
def _resource_model_map_helper(rs_map, resource, subclass):
if resource in rs_map:

View File

@ -37,7 +37,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def __init__(self):
super(RevisionPlugin, self).__init__()
# background on these event hooks:
# https://docs.sqlalchemy.org/en/latest/orm/session_events.html
db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions)
db_api.sqla_listen(
se.Session, "after_flush_postexec",
self._emit_related_revision_bumps)
db_api.sqla_listen(se.Session, 'after_commit',
self._clear_rev_bumped_flags)
db_api.sqla_listen(se.Session, 'after_rollback',
@ -46,9 +51,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def bump_revisions(self, session, context, instances):
self._enforce_if_match_constraints(session)
# bump revision number for any updated objects in the session
for obj in session.dirty:
if isinstance(obj, standard_attr.HasStandardAttributes):
self._bump_obj_revision(session, obj)
self._bump_obj_revisions(
session,
[
obj for obj in session.dirty
if isinstance(obj, standard_attr.HasStandardAttributes)]
)
# see if any created/updated/deleted objects bump the revision
# of another object
@ -56,26 +64,44 @@ class RevisionPlugin(service_base.ServicePluginBase):
o for o in session.deleted | session.dirty | session.new
if getattr(o, 'revises_on_change', ())
]
for obj in objects_with_related_revisions:
self._bump_related_revisions(session, obj)
collected = session.info.setdefault('_related_bumped', set())
self._collect_related_tobump(
session, objects_with_related_revisions, collected)
def _bump_related_revisions(self, session, obj):
for revises_col in getattr(obj, 'revises_on_change', ()):
def _emit_related_revision_bumps(self, session, context):
# within after_flush_postexec, emit an UPDATE statement to increment
# revision flags for related objects that were located in the
# before_flush phase.
#
# note that this event isn't called if the flush fails;
# in that case, the transaction is rolled back and the
# after_rollback event will invoke self._clear_rev_bumped_flags
# to clean out state.
collected = session.info.get('_related_bumped', None)
if collected:
try:
related_obj = self._find_related_obj(session, obj, revises_col)
self._bump_obj_revisions(
session, collected, version_check=False)
finally:
collected.clear()
def _collect_related_tobump(self, session, objects, collected):
for obj in objects:
if obj in collected:
continue
for revises_col in getattr(obj, 'revises_on_change', ()):
related_obj = self._find_related_obj(obj, revises_col)
if not related_obj:
LOG.warning("Could not find related %(col)s for "
"resource %(obj)s to bump revision.",
{'obj': obj, 'col': revises_col})
continue
# if related object revises others, bump those as well
self._bump_related_revisions(session, related_obj)
self._collect_related_tobump(session, [related_obj], collected)
# no need to bump revisions on related objects being deleted
if related_obj not in session.deleted:
self._bump_obj_revision(session, related_obj)
except exc.ObjectDeletedError:
# object was in session but another writer deleted it
pass
collected.add(related_obj)
return collected
def get_plugin_type(self):
return "revision_plugin"
@ -89,14 +115,19 @@ class RevisionPlugin(service_base.ServicePluginBase):
def extend_resource_dict_revision(resource_res, resource_db):
resource_res['revision_number'] = resource_db.revision_number
def _find_related_obj(self, session, obj, relationship_col):
def _find_related_obj(self, obj, relationship_col):
"""Gets a related object off of a relationship.
Raises a runtime error if the relationship isn't configured correctly
for revision bumping.
"""
# first check to see if it's directly attached to the object already
related_obj = getattr(obj, relationship_col)
try:
related_obj = getattr(obj, relationship_col)
except exc.ObjectDeletedError:
# object was in session but another writer deleted it
return None
if related_obj:
return related_obj
for rel in sqlalchemy.inspect(obj).mapper.relationships:
@ -110,24 +141,101 @@ class RevisionPlugin(service_base.ServicePluginBase):
def _clear_rev_bumped_flags(self, session):
"""This clears all flags on commit/rollback to enable rev bumps."""
session.info.pop('_related_bumped', None)
for inst in session:
setattr(inst, '_rev_bumped', False)
def _bump_obj_revision(self, session, obj):
"""Increment object revision in compare and swap fashion.
def _bump_obj_revisions(self, session, objects, version_check=True):
"""Increment object revisions.
If version_check=True, uses SQLAlchemy ORM's compare-and-swap feature
(known as "version_id_col" in the ORM mapping), which is part of the
StandardAttribute class.
If version_check=False, runs an UPDATE statement directly against
the set of all StandardAttribute objects at once, without using
any compare and swap logic.
If a revision number constraint rule was associated with the Session,
this is retrieved and each object is tested to see if it matches
this condition; if so, the constraint is enforced.
Before the increment, this checks and enforces any revision number
constraints.
"""
if getattr(obj, '_rev_bumped', False):
# we've already bumped the revision of this object in this txn
# filter objects for which we've already bumped the revision
to_bump = [
obj for obj in objects if not getattr(obj, '_rev_bumped', False)]
if not to_bump:
return
self._run_constrained_instance_match_check(session, to_bump)
if not version_check:
# this UPDATE statement could alternatively be written to run
# as an UPDATE-per-object with Python-generated revision numbers
# as parameters.
session.query(standard_attr.StandardAttribute).filter(
standard_attr.StandardAttribute.id.in_(
[obj._effective_standard_attribute_id for obj in to_bump]
)
).update({
# note that SQLAlchemy runs the onupdate function for
# the updated_at column and applies it to the SET clause as
# well.
standard_attr.StandardAttribute.revision_number:
standard_attr.StandardAttribute.revision_number + 1},
synchronize_session=False)
# run a SELECT to get back the new values we just generated.
# if MySQL supported RETURNING, we could get these numbers
# back from the UPDATE without running another SELECT.
retrieve_revision_numbers = {
row.id: (row.revision_number, row.updated_at)
for row in
session.query(
standard_attr.StandardAttribute.id,
standard_attr.StandardAttribute.revision_number,
standard_attr.StandardAttribute.updated_at,
).filter(
standard_attr.StandardAttribute.id.in_(
[
obj._effective_standard_attribute_id
for obj in to_bump
]
)
)
}
for obj in to_bump:
if version_check:
# full version check, run the ORM routine to UPDATE
# the row with a WHERE clause
obj.bump_revision()
else:
# no version check - get back what we did in our one-step
# UPDATE statement and set it without causing change in
# ORM flush state
try:
new_version_id, new_updated_at = retrieve_revision_numbers[
obj._effective_standard_attribute_id
]
except KeyError:
# in case the object was deleted concurrently
LOG.warning(
"No standard attr row found for resource: %(obj)s",
{'obj': obj})
else:
obj._set_updated_revision_number(
new_version_id, new_updated_at)
setattr(obj, '_rev_bumped', True)
def _run_constrained_instance_match_check(self, session, objects):
instance, match = self._get_constrained_instance_match(session)
if instance and instance == obj:
# one last check before bumping revision
self._enforce_if_match_constraints(session)
obj.bump_revision()
setattr(obj, '_rev_bumped', True)
for obj in objects:
if instance and instance == obj:
# one last check before bumping revision
self._enforce_if_match_constraints(session)
def _find_instance_by_column_value(self, session, model, column, value):
"""Lookup object in session or from DB based on a column's value."""

View File

@ -78,10 +78,27 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase):
other_ctx.session.delete(
other_ctx.session.query(models_v2.Port).first()
)
# expire the port so the revision bumping code will trigger a
# lookup on its attributes and encounter an ObjectDeletedError
other_ctx.session.flush()
# ensure no attribute lookups are attempted on an
# object deleted from the session when doing related
# bumps
self.ctx.session.expire(port)
rp._bump_related_revisions(self.ctx.session, ipal_obj)
collected = rp._collect_related_tobump(
self.ctx.session, [ipal_obj], set())
rp._bump_obj_revisions(
self.ctx.session, collected, version_check=False)
def test_shared_network_create(self):
# this test intends to run db_base_plugin_v2 -> create_network_db,
# which in turn creates a Network and then a NetworkRBAC object.
# An issue was observed with the revision_plugin which would interfere
# with the flush process that occurs with these two connected objects,
# creating two copies of the Network object in the Session and putting
# it into an invalid state.
with self.network(shared=True):
pass
def test_port_name_update_revises(self):
with self.port() as port: