Merge "Run revision bump operations en masse" into stable/queens

This commit is contained in:
Zuul 2019-11-26 11:41:38 +00:00 committed by Gerrit Code Review
commit a4633151d3
3 changed files with 173 additions and 29 deletions

View File

@ -19,6 +19,7 @@ import sqlalchemy as sa
from sqlalchemy import event # noqa from sqlalchemy import event # noqa
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext import declarative from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from sqlalchemy.orm import session as se from sqlalchemy.orm import session as se
from neutron._i18n import _ from neutron._i18n import _
@ -76,6 +77,16 @@ class StandardAttribute(model_base.BASEV2):
return return
self.revision_number += 1 self.revision_number += 1
def _set_updated_revision_number(self, revision_number, updated_at):
attributes.set_committed_value(
self, "revision_number", revision_number)
attributes.set_committed_value(
self, "updated_at", updated_at)
@property
def _effective_standard_attribute_id(self):
return self.id
class HasStandardAttributes(object): class HasStandardAttributes(object):
@ -130,6 +141,10 @@ class HasStandardAttributes(object):
single_parent=True, single_parent=True,
uselist=False) uselist=False)
@property
def _effective_standard_attribute_id(self):
return self.standard_attr_id
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
standard_attr_keys = ['description', 'created_at', standard_attr_keys = ['description', 'created_at',
'updated_at', 'revision_number'] 'updated_at', 'revision_number']
@ -172,6 +187,10 @@ class HasStandardAttributes(object):
# modified (e.g. fixed_ips change should bump port revision) # modified (e.g. fixed_ips change should bump port revision)
self.standard_attr.bump_revision() self.standard_attr.bump_revision()
def _set_updated_revision_number(self, revision_number, updated_at):
self.standard_attr._set_updated_revision_number(
revision_number, updated_at)
def get_standard_attr_resource_model_map(): def get_standard_attr_resource_model_map():
rs_map = {} rs_map = {}

View File

@ -35,7 +35,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def __init__(self): def __init__(self):
super(RevisionPlugin, self).__init__() super(RevisionPlugin, self).__init__()
# background on these event hooks:
# https://docs.sqlalchemy.org/en/latest/orm/session_events.html
db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions) db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions)
db_api.sqla_listen(
se.Session, "after_flush_postexec",
self._emit_related_revision_bumps)
db_api.sqla_listen(se.Session, 'after_commit', db_api.sqla_listen(se.Session, 'after_commit',
self._clear_rev_bumped_flags) self._clear_rev_bumped_flags)
db_api.sqla_listen(se.Session, 'after_rollback', db_api.sqla_listen(se.Session, 'after_rollback',
@ -44,9 +49,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def bump_revisions(self, session, context, instances): def bump_revisions(self, session, context, instances):
self._enforce_if_match_constraints(session) self._enforce_if_match_constraints(session)
# bump revision number for any updated objects in the session # bump revision number for any updated objects in the session
for obj in session.dirty: self._bump_obj_revisions(
if isinstance(obj, standard_attr.HasStandardAttributes): session,
self._bump_obj_revision(session, obj) [
obj for obj in session.dirty
if isinstance(obj, standard_attr.HasStandardAttributes)]
)
# see if any created/updated/deleted objects bump the revision # see if any created/updated/deleted objects bump the revision
# of another object # of another object
@ -54,26 +62,44 @@ class RevisionPlugin(service_base.ServicePluginBase):
o for o in session.deleted | session.dirty | session.new o for o in session.deleted | session.dirty | session.new
if getattr(o, 'revises_on_change', ()) if getattr(o, 'revises_on_change', ())
] ]
for obj in objects_with_related_revisions: collected = session.info.setdefault('_related_bumped', set())
self._bump_related_revisions(session, obj) self._collect_related_tobump(
session, objects_with_related_revisions, collected)
def _bump_related_revisions(self, session, obj): def _emit_related_revision_bumps(self, session, context):
for revises_col in getattr(obj, 'revises_on_change', ()): # within after_flush_postexec, emit an UPDATE statement to increment
# revision flags for related objects that were located in the
# before_flush phase.
#
# note that this event isn't called if the flush fails;
# in that case, the transaction is rolled back and the
# after_rollback event will invoke self._clear_rev_bumped_flags
# to clean out state.
collected = session.info.get('_related_bumped', None)
if collected:
try: try:
related_obj = self._find_related_obj(session, obj, revises_col) self._bump_obj_revisions(
session, collected, version_check=False)
finally:
collected.clear()
def _collect_related_tobump(self, session, objects, collected):
for obj in objects:
if obj in collected:
continue
for revises_col in getattr(obj, 'revises_on_change', ()):
related_obj = self._find_related_obj(obj, revises_col)
if not related_obj: if not related_obj:
LOG.warning("Could not find related %(col)s for " LOG.warning("Could not find related %(col)s for "
"resource %(obj)s to bump revision.", "resource %(obj)s to bump revision.",
{'obj': obj, 'col': revises_col}) {'obj': obj, 'col': revises_col})
continue continue
# if related object revises others, bump those as well # if related object revises others, bump those as well
self._bump_related_revisions(session, related_obj) self._collect_related_tobump(session, [related_obj], collected)
# no need to bump revisions on related objects being deleted # no need to bump revisions on related objects being deleted
if related_obj not in session.deleted: if related_obj not in session.deleted:
self._bump_obj_revision(session, related_obj) collected.add(related_obj)
except exc.ObjectDeletedError: return collected
# object was in session but another writer deleted it
pass
def get_plugin_type(self): def get_plugin_type(self):
return "revision_plugin" return "revision_plugin"
@ -87,14 +113,19 @@ class RevisionPlugin(service_base.ServicePluginBase):
def extend_resource_dict_revision(resource_res, resource_db): def extend_resource_dict_revision(resource_res, resource_db):
resource_res['revision_number'] = resource_db.revision_number resource_res['revision_number'] = resource_db.revision_number
def _find_related_obj(self, session, obj, relationship_col): def _find_related_obj(self, obj, relationship_col):
"""Gets a related object off of a relationship. """Gets a related object off of a relationship.
Raises a runtime error if the relationship isn't configured correctly Raises a runtime error if the relationship isn't configured correctly
for revision bumping. for revision bumping.
""" """
# first check to see if it's directly attached to the object already # first check to see if it's directly attached to the object already
related_obj = getattr(obj, relationship_col) try:
related_obj = getattr(obj, relationship_col)
except exc.ObjectDeletedError:
# object was in session but another writer deleted it
return None
if related_obj: if related_obj:
return related_obj return related_obj
for rel in sqlalchemy.inspect(obj).mapper.relationships: for rel in sqlalchemy.inspect(obj).mapper.relationships:
@ -108,24 +139,101 @@ class RevisionPlugin(service_base.ServicePluginBase):
def _clear_rev_bumped_flags(self, session): def _clear_rev_bumped_flags(self, session):
"""This clears all flags on commit/rollback to enable rev bumps.""" """This clears all flags on commit/rollback to enable rev bumps."""
session.info.pop('_related_bumped', None)
for inst in session: for inst in session:
setattr(inst, '_rev_bumped', False) setattr(inst, '_rev_bumped', False)
def _bump_obj_revision(self, session, obj): def _bump_obj_revisions(self, session, objects, version_check=True):
"""Increment object revision in compare and swap fashion. """Increment object revisions.
If version_check=True, uses SQLAlchemy ORM's compare-and-swap feature
(known as "version_id_col" in the ORM mapping), which is part of the
StandardAttribute class.
If version_check=False, runs an UPDATE statement directly against
the set of all StandardAttribute objects at once, without using
any compare and swap logic.
If a revision number constraint rule was associated with the Session,
this is retrieved and each object is tested to see if it matches
this condition; if so, the constraint is enforced.
Before the increment, this checks and enforces any revision number
constraints.
""" """
if getattr(obj, '_rev_bumped', False):
# we've already bumped the revision of this object in this txn # filter objects for which we've already bumped the revision
to_bump = [
obj for obj in objects if not getattr(obj, '_rev_bumped', False)]
if not to_bump:
return return
self._run_constrained_instance_match_check(session, to_bump)
if not version_check:
# this UPDATE statement could alternatively be written to run
# as an UPDATE-per-object with Python-generated revision numbers
# as parameters.
session.query(standard_attr.StandardAttribute).filter(
standard_attr.StandardAttribute.id.in_(
[obj._effective_standard_attribute_id for obj in to_bump]
)
).update({
# note that SQLAlchemy runs the onupdate function for
# the updated_at column and applies it to the SET clause as
# well.
standard_attr.StandardAttribute.revision_number:
standard_attr.StandardAttribute.revision_number + 1},
synchronize_session=False)
# run a SELECT to get back the new values we just generated.
# if MySQL supported RETURNING, we could get these numbers
# back from the UPDATE without running another SELECT.
retrieve_revision_numbers = {
row.id: (row.revision_number, row.updated_at)
for row in
session.query(
standard_attr.StandardAttribute.id,
standard_attr.StandardAttribute.revision_number,
standard_attr.StandardAttribute.updated_at,
).filter(
standard_attr.StandardAttribute.id.in_(
[
obj._effective_standard_attribute_id
for obj in to_bump
]
)
)
}
for obj in to_bump:
if version_check:
# full version check, run the ORM routine to UPDATE
# the row with a WHERE clause
obj.bump_revision()
else:
# no version check - get back what we did in our one-step
# UPDATE statement and set it without causing change in
# ORM flush state
try:
new_version_id, new_updated_at = retrieve_revision_numbers[
obj._effective_standard_attribute_id
]
except KeyError:
# in case the object was deleted concurrently
LOG.warning(
"No standard attr row found for resource: %(obj)s",
{'obj': obj})
else:
obj._set_updated_revision_number(
new_version_id, new_updated_at)
setattr(obj, '_rev_bumped', True)
def _run_constrained_instance_match_check(self, session, objects):
instance, match = self._get_constrained_instance_match(session) instance, match = self._get_constrained_instance_match(session)
if instance and instance == obj: for obj in objects:
# one last check before bumping revision if instance and instance == obj:
self._enforce_if_match_constraints(session) # one last check before bumping revision
obj.bump_revision() self._enforce_if_match_constraints(session)
setattr(obj, '_rev_bumped', True)
def _find_instance_by_column_value(self, session, model, column, value): def _find_instance_by_column_value(self, session, model, column, value):
"""Lookup object in session or from DB based on a column's value.""" """Lookup object in session or from DB based on a column's value."""

View File

@ -79,10 +79,27 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase):
other_ctx.session.delete( other_ctx.session.delete(
other_ctx.session.query(models_v2.Port).first() other_ctx.session.query(models_v2.Port).first()
) )
# expire the port so the revision bumping code will trigger a other_ctx.session.flush()
# lookup on its attributes and encounter an ObjectDeletedError
# ensure no attribute lookups are attempted on an
# object deleted from the session when doing related
# bumps
self.ctx.session.expire(port) self.ctx.session.expire(port)
rp._bump_related_revisions(self.ctx.session, ipal_obj)
collected = rp._collect_related_tobump(
self.ctx.session, [ipal_obj], set())
rp._bump_obj_revisions(
self.ctx.session, collected, version_check=False)
def test_shared_network_create(self):
# this test intends to run db_base_plugin_v2 -> create_network_db,
# which in turn creates a Network and then a NetworkRBAC object.
# An issue was observed with the revision_plugin which would interfere
# with the flush process that occurs with these two connected objects,
# creating two copies of the Network object in the Session and putting
# it into an invalid state.
with self.network(shared=True):
pass
def test_port_name_update_revises(self): def test_port_name_update_revises(self):
with self.port() as port: with self.port() as port: