diff --git a/neutron/db/standard_attr.py b/neutron/db/standard_attr.py index ad89a6474bf..857e1aa1595 100644 --- a/neutron/db/standard_attr.py +++ b/neutron/db/standard_attr.py @@ -19,6 +19,7 @@ import sqlalchemy as sa from sqlalchemy import event # noqa from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext import declarative +from sqlalchemy.orm import attributes from sqlalchemy.orm import session as se from neutron._i18n import _ @@ -76,6 +77,16 @@ class StandardAttribute(model_base.BASEV2): return self.revision_number += 1 + def _set_updated_revision_number(self, revision_number, updated_at): + attributes.set_committed_value( + self, "revision_number", revision_number) + attributes.set_committed_value( + self, "updated_at", updated_at) + + @property + def _effective_standard_attribute_id(self): + return self.id + class HasStandardAttributes(object): @@ -130,6 +141,10 @@ class HasStandardAttributes(object): single_parent=True, uselist=False) + @property + def _effective_standard_attribute_id(self): + return self.standard_attr_id + def __init__(self, *args, **kwargs): standard_attr_keys = ['description', 'created_at', 'updated_at', 'revision_number'] @@ -172,6 +187,10 @@ class HasStandardAttributes(object): # modified (e.g. fixed_ips change should bump port revision) self.standard_attr.bump_revision() + def _set_updated_revision_number(self, revision_number, updated_at): + self.standard_attr._set_updated_revision_number( + revision_number, updated_at) + def get_standard_attr_resource_model_map(): rs_map = {} diff --git a/neutron/services/revisions/revision_plugin.py b/neutron/services/revisions/revision_plugin.py index 9115cf5ac9c..6e449f84eec 100644 --- a/neutron/services/revisions/revision_plugin.py +++ b/neutron/services/revisions/revision_plugin.py @@ -35,7 +35,12 @@ class RevisionPlugin(service_base.ServicePluginBase): def __init__(self): super(RevisionPlugin, self).__init__() + # background on these event hooks: + # https://docs.sqlalchemy.org/en/latest/orm/session_events.html db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions) + db_api.sqla_listen( + se.Session, "after_flush_postexec", + self._emit_related_revision_bumps) db_api.sqla_listen(se.Session, 'after_commit', self._clear_rev_bumped_flags) db_api.sqla_listen(se.Session, 'after_rollback', @@ -44,9 +49,12 @@ class RevisionPlugin(service_base.ServicePluginBase): def bump_revisions(self, session, context, instances): self._enforce_if_match_constraints(session) # bump revision number for any updated objects in the session - for obj in session.dirty: - if isinstance(obj, standard_attr.HasStandardAttributes): - self._bump_obj_revision(session, obj) + self._bump_obj_revisions( + session, + [ + obj for obj in session.dirty + if isinstance(obj, standard_attr.HasStandardAttributes)] + ) # see if any created/updated/deleted objects bump the revision # of another object @@ -54,26 +62,44 @@ class RevisionPlugin(service_base.ServicePluginBase): o for o in session.deleted | session.dirty | session.new if getattr(o, 'revises_on_change', ()) ] - for obj in objects_with_related_revisions: - self._bump_related_revisions(session, obj) + collected = session.info.setdefault('_related_bumped', set()) + self._collect_related_tobump( + session, objects_with_related_revisions, collected) - def _bump_related_revisions(self, session, obj): - for revises_col in getattr(obj, 'revises_on_change', ()): + def _emit_related_revision_bumps(self, session, context): + # within after_flush_postexec, emit an UPDATE statement to increment + # revision flags for related objects that were located in the + # before_flush phase. + # + # note that this event isn't called if the flush fails; + # in that case, the transaction is rolled back and the + # after_rollback event will invoke self._clear_rev_bumped_flags + # to clean out state. + collected = session.info.get('_related_bumped', None) + if collected: try: - related_obj = self._find_related_obj(session, obj, revises_col) + self._bump_obj_revisions( + session, collected, version_check=False) + finally: + collected.clear() + + def _collect_related_tobump(self, session, objects, collected): + for obj in objects: + if obj in collected: + continue + for revises_col in getattr(obj, 'revises_on_change', ()): + related_obj = self._find_related_obj(obj, revises_col) if not related_obj: LOG.warning("Could not find related %(col)s for " "resource %(obj)s to bump revision.", {'obj': obj, 'col': revises_col}) continue # if related object revises others, bump those as well - self._bump_related_revisions(session, related_obj) + self._collect_related_tobump(session, [related_obj], collected) # no need to bump revisions on related objects being deleted if related_obj not in session.deleted: - self._bump_obj_revision(session, related_obj) - except exc.ObjectDeletedError: - # object was in session but another writer deleted it - pass + collected.add(related_obj) + return collected def get_plugin_type(self): return "revision_plugin" @@ -87,14 +113,19 @@ class RevisionPlugin(service_base.ServicePluginBase): def extend_resource_dict_revision(resource_res, resource_db): resource_res['revision_number'] = resource_db.revision_number - def _find_related_obj(self, session, obj, relationship_col): + def _find_related_obj(self, obj, relationship_col): """Gets a related object off of a relationship. Raises a runtime error if the relationship isn't configured correctly for revision bumping. """ # first check to see if it's directly attached to the object already - related_obj = getattr(obj, relationship_col) + try: + related_obj = getattr(obj, relationship_col) + except exc.ObjectDeletedError: + # object was in session but another writer deleted it + return None + if related_obj: return related_obj for rel in sqlalchemy.inspect(obj).mapper.relationships: @@ -108,24 +139,101 @@ class RevisionPlugin(service_base.ServicePluginBase): def _clear_rev_bumped_flags(self, session): """This clears all flags on commit/rollback to enable rev bumps.""" + session.info.pop('_related_bumped', None) for inst in session: setattr(inst, '_rev_bumped', False) - def _bump_obj_revision(self, session, obj): - """Increment object revision in compare and swap fashion. + def _bump_obj_revisions(self, session, objects, version_check=True): + """Increment object revisions. + + If version_check=True, uses SQLAlchemy ORM's compare-and-swap feature + (known as "version_id_col" in the ORM mapping), which is part of the + StandardAttribute class. + + If version_check=False, runs an UPDATE statement directly against + the set of all StandardAttribute objects at once, without using + any compare and swap logic. + + If a revision number constraint rule was associated with the Session, + this is retrieved and each object is tested to see if it matches + this condition; if so, the constraint is enforced. - Before the increment, this checks and enforces any revision number - constraints. """ - if getattr(obj, '_rev_bumped', False): - # we've already bumped the revision of this object in this txn + + # filter objects for which we've already bumped the revision + to_bump = [ + obj for obj in objects if not getattr(obj, '_rev_bumped', False)] + + if not to_bump: return + + self._run_constrained_instance_match_check(session, to_bump) + + if not version_check: + # this UPDATE statement could alternatively be written to run + # as an UPDATE-per-object with Python-generated revision numbers + # as parameters. + session.query(standard_attr.StandardAttribute).filter( + standard_attr.StandardAttribute.id.in_( + [obj._effective_standard_attribute_id for obj in to_bump] + ) + ).update({ + # note that SQLAlchemy runs the onupdate function for + # the updated_at column and applies it to the SET clause as + # well. + standard_attr.StandardAttribute.revision_number: + standard_attr.StandardAttribute.revision_number + 1}, + synchronize_session=False) + + # run a SELECT to get back the new values we just generated. + # if MySQL supported RETURNING, we could get these numbers + # back from the UPDATE without running another SELECT. + retrieve_revision_numbers = { + row.id: (row.revision_number, row.updated_at) + for row in + session.query( + standard_attr.StandardAttribute.id, + standard_attr.StandardAttribute.revision_number, + standard_attr.StandardAttribute.updated_at, + ).filter( + standard_attr.StandardAttribute.id.in_( + [ + obj._effective_standard_attribute_id + for obj in to_bump + ] + ) + ) + } + + for obj in to_bump: + if version_check: + # full version check, run the ORM routine to UPDATE + # the row with a WHERE clause + obj.bump_revision() + else: + # no version check - get back what we did in our one-step + # UPDATE statement and set it without causing change in + # ORM flush state + try: + new_version_id, new_updated_at = retrieve_revision_numbers[ + obj._effective_standard_attribute_id + ] + except KeyError: + # in case the object was deleted concurrently + LOG.warning( + "No standard attr row found for resource: %(obj)s", + {'obj': obj}) + else: + obj._set_updated_revision_number( + new_version_id, new_updated_at) + setattr(obj, '_rev_bumped', True) + + def _run_constrained_instance_match_check(self, session, objects): instance, match = self._get_constrained_instance_match(session) - if instance and instance == obj: - # one last check before bumping revision - self._enforce_if_match_constraints(session) - obj.bump_revision() - setattr(obj, '_rev_bumped', True) + for obj in objects: + if instance and instance == obj: + # one last check before bumping revision + self._enforce_if_match_constraints(session) def _find_instance_by_column_value(self, session, model, column, value): """Lookup object in session or from DB based on a column's value.""" diff --git a/neutron/tests/unit/services/revisions/test_revision_plugin.py b/neutron/tests/unit/services/revisions/test_revision_plugin.py index bafd6020077..84803dbc3b7 100644 --- a/neutron/tests/unit/services/revisions/test_revision_plugin.py +++ b/neutron/tests/unit/services/revisions/test_revision_plugin.py @@ -79,10 +79,27 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase): other_ctx.session.delete( other_ctx.session.query(models_v2.Port).first() ) - # expire the port so the revision bumping code will trigger a - # lookup on its attributes and encounter an ObjectDeletedError + other_ctx.session.flush() + + # ensure no attribute lookups are attempted on an + # object deleted from the session when doing related + # bumps self.ctx.session.expire(port) - rp._bump_related_revisions(self.ctx.session, ipal_obj) + + collected = rp._collect_related_tobump( + self.ctx.session, [ipal_obj], set()) + rp._bump_obj_revisions( + self.ctx.session, collected, version_check=False) + + def test_shared_network_create(self): + # this test intends to run db_base_plugin_v2 -> create_network_db, + # which in turn creates a Network and then a NetworkRBAC object. + # An issue was observed with the revision_plugin which would interfere + # with the flush process that occurs with these two connected objects, + # creating two copies of the Network object in the Session and putting + # it into an invalid state. + with self.network(shared=True): + pass def test_port_name_update_revises(self): with self.port() as port: