From 134ab39ee4579951bf24364520bd7ee576b88cfb Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 7 Feb 2019 16:59:51 -0500 Subject: [PATCH] Run revision bump operations en masse Refactored RevisionPlugin to operate upon sets of objects at once. In the case of "related" objects, the bump operation no longer makes use of compare-and-swap and instead updates version numbers directly without testing for their previous value. This removes the issue of StaleDataErrors being prevalent within the related object update phase, given the assumption that the compare-and-swap logic is only desireable for the primary object being updated. Change-Id: I2fef298041c59a03dfd06912764973995b80690c (cherry picked from commit d841ce72bfa1ebcdb06f3fd5fba6cc68a4f04cb6) --- neutron/db/standard_attr.py | 19 +++ neutron/services/revisions/revision_plugin.py | 160 +++++++++++++++--- .../revisions/test_revision_plugin.py | 23 ++- 3 files changed, 173 insertions(+), 29 deletions(-) diff --git a/neutron/db/standard_attr.py b/neutron/db/standard_attr.py index ad89a6474bf..857e1aa1595 100644 --- a/neutron/db/standard_attr.py +++ b/neutron/db/standard_attr.py @@ -19,6 +19,7 @@ import sqlalchemy as sa from sqlalchemy import event # noqa from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext import declarative +from sqlalchemy.orm import attributes from sqlalchemy.orm import session as se from neutron._i18n import _ @@ -76,6 +77,16 @@ class StandardAttribute(model_base.BASEV2): return self.revision_number += 1 + def _set_updated_revision_number(self, revision_number, updated_at): + attributes.set_committed_value( + self, "revision_number", revision_number) + attributes.set_committed_value( + self, "updated_at", updated_at) + + @property + def _effective_standard_attribute_id(self): + return self.id + class HasStandardAttributes(object): @@ -130,6 +141,10 @@ class HasStandardAttributes(object): single_parent=True, uselist=False) + @property + def _effective_standard_attribute_id(self): + return self.standard_attr_id + def __init__(self, *args, **kwargs): standard_attr_keys = ['description', 'created_at', 'updated_at', 'revision_number'] @@ -172,6 +187,10 @@ class HasStandardAttributes(object): # modified (e.g. fixed_ips change should bump port revision) self.standard_attr.bump_revision() + def _set_updated_revision_number(self, revision_number, updated_at): + self.standard_attr._set_updated_revision_number( + revision_number, updated_at) + def get_standard_attr_resource_model_map(): rs_map = {} diff --git a/neutron/services/revisions/revision_plugin.py b/neutron/services/revisions/revision_plugin.py index 9115cf5ac9c..6e449f84eec 100644 --- a/neutron/services/revisions/revision_plugin.py +++ b/neutron/services/revisions/revision_plugin.py @@ -35,7 +35,12 @@ class RevisionPlugin(service_base.ServicePluginBase): def __init__(self): super(RevisionPlugin, self).__init__() + # background on these event hooks: + # https://docs.sqlalchemy.org/en/latest/orm/session_events.html db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions) + db_api.sqla_listen( + se.Session, "after_flush_postexec", + self._emit_related_revision_bumps) db_api.sqla_listen(se.Session, 'after_commit', self._clear_rev_bumped_flags) db_api.sqla_listen(se.Session, 'after_rollback', @@ -44,9 +49,12 @@ class RevisionPlugin(service_base.ServicePluginBase): def bump_revisions(self, session, context, instances): self._enforce_if_match_constraints(session) # bump revision number for any updated objects in the session - for obj in session.dirty: - if isinstance(obj, standard_attr.HasStandardAttributes): - self._bump_obj_revision(session, obj) + self._bump_obj_revisions( + session, + [ + obj for obj in session.dirty + if isinstance(obj, standard_attr.HasStandardAttributes)] + ) # see if any created/updated/deleted objects bump the revision # of another object @@ -54,26 +62,44 @@ class RevisionPlugin(service_base.ServicePluginBase): o for o in session.deleted | session.dirty | session.new if getattr(o, 'revises_on_change', ()) ] - for obj in objects_with_related_revisions: - self._bump_related_revisions(session, obj) + collected = session.info.setdefault('_related_bumped', set()) + self._collect_related_tobump( + session, objects_with_related_revisions, collected) - def _bump_related_revisions(self, session, obj): - for revises_col in getattr(obj, 'revises_on_change', ()): + def _emit_related_revision_bumps(self, session, context): + # within after_flush_postexec, emit an UPDATE statement to increment + # revision flags for related objects that were located in the + # before_flush phase. + # + # note that this event isn't called if the flush fails; + # in that case, the transaction is rolled back and the + # after_rollback event will invoke self._clear_rev_bumped_flags + # to clean out state. + collected = session.info.get('_related_bumped', None) + if collected: try: - related_obj = self._find_related_obj(session, obj, revises_col) + self._bump_obj_revisions( + session, collected, version_check=False) + finally: + collected.clear() + + def _collect_related_tobump(self, session, objects, collected): + for obj in objects: + if obj in collected: + continue + for revises_col in getattr(obj, 'revises_on_change', ()): + related_obj = self._find_related_obj(obj, revises_col) if not related_obj: LOG.warning("Could not find related %(col)s for " "resource %(obj)s to bump revision.", {'obj': obj, 'col': revises_col}) continue # if related object revises others, bump those as well - self._bump_related_revisions(session, related_obj) + self._collect_related_tobump(session, [related_obj], collected) # no need to bump revisions on related objects being deleted if related_obj not in session.deleted: - self._bump_obj_revision(session, related_obj) - except exc.ObjectDeletedError: - # object was in session but another writer deleted it - pass + collected.add(related_obj) + return collected def get_plugin_type(self): return "revision_plugin" @@ -87,14 +113,19 @@ class RevisionPlugin(service_base.ServicePluginBase): def extend_resource_dict_revision(resource_res, resource_db): resource_res['revision_number'] = resource_db.revision_number - def _find_related_obj(self, session, obj, relationship_col): + def _find_related_obj(self, obj, relationship_col): """Gets a related object off of a relationship. Raises a runtime error if the relationship isn't configured correctly for revision bumping. """ # first check to see if it's directly attached to the object already - related_obj = getattr(obj, relationship_col) + try: + related_obj = getattr(obj, relationship_col) + except exc.ObjectDeletedError: + # object was in session but another writer deleted it + return None + if related_obj: return related_obj for rel in sqlalchemy.inspect(obj).mapper.relationships: @@ -108,24 +139,101 @@ class RevisionPlugin(service_base.ServicePluginBase): def _clear_rev_bumped_flags(self, session): """This clears all flags on commit/rollback to enable rev bumps.""" + session.info.pop('_related_bumped', None) for inst in session: setattr(inst, '_rev_bumped', False) - def _bump_obj_revision(self, session, obj): - """Increment object revision in compare and swap fashion. + def _bump_obj_revisions(self, session, objects, version_check=True): + """Increment object revisions. + + If version_check=True, uses SQLAlchemy ORM's compare-and-swap feature + (known as "version_id_col" in the ORM mapping), which is part of the + StandardAttribute class. + + If version_check=False, runs an UPDATE statement directly against + the set of all StandardAttribute objects at once, without using + any compare and swap logic. + + If a revision number constraint rule was associated with the Session, + this is retrieved and each object is tested to see if it matches + this condition; if so, the constraint is enforced. - Before the increment, this checks and enforces any revision number - constraints. """ - if getattr(obj, '_rev_bumped', False): - # we've already bumped the revision of this object in this txn + + # filter objects for which we've already bumped the revision + to_bump = [ + obj for obj in objects if not getattr(obj, '_rev_bumped', False)] + + if not to_bump: return + + self._run_constrained_instance_match_check(session, to_bump) + + if not version_check: + # this UPDATE statement could alternatively be written to run + # as an UPDATE-per-object with Python-generated revision numbers + # as parameters. + session.query(standard_attr.StandardAttribute).filter( + standard_attr.StandardAttribute.id.in_( + [obj._effective_standard_attribute_id for obj in to_bump] + ) + ).update({ + # note that SQLAlchemy runs the onupdate function for + # the updated_at column and applies it to the SET clause as + # well. + standard_attr.StandardAttribute.revision_number: + standard_attr.StandardAttribute.revision_number + 1}, + synchronize_session=False) + + # run a SELECT to get back the new values we just generated. + # if MySQL supported RETURNING, we could get these numbers + # back from the UPDATE without running another SELECT. + retrieve_revision_numbers = { + row.id: (row.revision_number, row.updated_at) + for row in + session.query( + standard_attr.StandardAttribute.id, + standard_attr.StandardAttribute.revision_number, + standard_attr.StandardAttribute.updated_at, + ).filter( + standard_attr.StandardAttribute.id.in_( + [ + obj._effective_standard_attribute_id + for obj in to_bump + ] + ) + ) + } + + for obj in to_bump: + if version_check: + # full version check, run the ORM routine to UPDATE + # the row with a WHERE clause + obj.bump_revision() + else: + # no version check - get back what we did in our one-step + # UPDATE statement and set it without causing change in + # ORM flush state + try: + new_version_id, new_updated_at = retrieve_revision_numbers[ + obj._effective_standard_attribute_id + ] + except KeyError: + # in case the object was deleted concurrently + LOG.warning( + "No standard attr row found for resource: %(obj)s", + {'obj': obj}) + else: + obj._set_updated_revision_number( + new_version_id, new_updated_at) + setattr(obj, '_rev_bumped', True) + + def _run_constrained_instance_match_check(self, session, objects): instance, match = self._get_constrained_instance_match(session) - if instance and instance == obj: - # one last check before bumping revision - self._enforce_if_match_constraints(session) - obj.bump_revision() - setattr(obj, '_rev_bumped', True) + for obj in objects: + if instance and instance == obj: + # one last check before bumping revision + self._enforce_if_match_constraints(session) def _find_instance_by_column_value(self, session, model, column, value): """Lookup object in session or from DB based on a column's value.""" diff --git a/neutron/tests/unit/services/revisions/test_revision_plugin.py b/neutron/tests/unit/services/revisions/test_revision_plugin.py index bafd6020077..84803dbc3b7 100644 --- a/neutron/tests/unit/services/revisions/test_revision_plugin.py +++ b/neutron/tests/unit/services/revisions/test_revision_plugin.py @@ -79,10 +79,27 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase): other_ctx.session.delete( other_ctx.session.query(models_v2.Port).first() ) - # expire the port so the revision bumping code will trigger a - # lookup on its attributes and encounter an ObjectDeletedError + other_ctx.session.flush() + + # ensure no attribute lookups are attempted on an + # object deleted from the session when doing related + # bumps self.ctx.session.expire(port) - rp._bump_related_revisions(self.ctx.session, ipal_obj) + + collected = rp._collect_related_tobump( + self.ctx.session, [ipal_obj], set()) + rp._bump_obj_revisions( + self.ctx.session, collected, version_check=False) + + def test_shared_network_create(self): + # this test intends to run db_base_plugin_v2 -> create_network_db, + # which in turn creates a Network and then a NetworkRBAC object. + # An issue was observed with the revision_plugin which would interfere + # with the flush process that occurs with these two connected objects, + # creating two copies of the Network object in the Session and putting + # it into an invalid state. + with self.network(shared=True): + pass def test_port_name_update_revises(self): with self.port() as port: