Revert "Revert "Automatically expire obsolete relationships""
This reverts commit a75014792a
.
This is the second attempt to merge the patch after the previous one resulted
in revert due to multiple gate breakages in dependent projects (neutron-lbaas,
vmware-nsx, heat, networking-odl). This second attempt is validated with a set
of depends-on patches for all projects that were affected during the first
failed attempt.
The original commit message for the patch is included below for context.
===
Listen for foreign key changes and expire related relationships.
With this, we can remove OVO code that refreshes / detaches models on
each fetch. The patch also removes a bunch of expunge calls in plugin
code.
writer.using context manager is added to _get_subnets so that segment
plugin's _notify_subnet_updated handler that calls to _get_subnets
doesn't use the facade-less context.session that in specific cases may
cache old models from previous sessions when used in mixed
facade/facade-less environment.
This patch bumps SQLAlchemy minimal requirement to >= 1.2.0 because
pending_to_persistent event didn't exist before this version. It could be >=
1.1.0 if not for the fact that all 1.1.x releases have a bug that results in
breakage of test_update_with_none_and_own_mac_for_duplicate_ip due to obscure
import ordering issue in the library.
(The issue is fixed by https://github.com/zzzeek/sqlalchemy/commit/
63ff0140705207198545e3a0d7868a5ba8486e93)
Partially-Implements: blueprint enginefacade-switch
Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
Co-Authored-By: Michael Bayer <mike_mp@zzzcomputing.com>
Depends-On: If4b28110f460f6ac77ace1bbb02967ea986d4cab
Depends-On: I9f1e76cb24838533572b5fbe269ff96a24ce4af1
Change-Id: I0d65d19204da8ce30addfa5faff68544534b7853
This commit is contained in:
parent
a75014792a
commit
f69389004a
@ -131,7 +131,7 @@ snowballstemmer==1.2.1
|
||||
Sphinx==1.6.5
|
||||
sphinxcontrib-websupport==1.0.1
|
||||
sqlalchemy-migrate==0.11.0
|
||||
SQLAlchemy==1.0.10
|
||||
SQLAlchemy==1.2.0
|
||||
sqlparse==0.2.2
|
||||
statsd==3.2.1
|
||||
stestr==1.0.0
|
||||
|
@ -18,6 +18,7 @@ import copy
|
||||
import weakref
|
||||
|
||||
from neutron_lib.db import api
|
||||
from neutron_lib.db import model_base
|
||||
from neutron_lib import exceptions
|
||||
from neutron_lib.objects import exceptions as obj_exc
|
||||
from oslo_config import cfg
|
||||
@ -286,3 +287,127 @@ def load_one_to_manys(session):
|
||||
msg = ("Relationship %s attributes must be loaded in db"
|
||||
" object %s" % (relationship_attr.key, state.dict))
|
||||
raise AssertionError(msg)
|
||||
|
||||
|
||||
# Expire relationships when foreign key changes.
|
||||
#
|
||||
# NOTE(ihrachys) Arguably, it's a sqlalchemy anti-pattern to access child
|
||||
# models directly and through parent relationships in the same session. But
|
||||
# since OVO mechanism is built around synthetic fields that assume this mixed
|
||||
# access is possible, we keep it here until we find a way to migrate OVO
|
||||
# synthetic fields to better mechanism that would update child models via
|
||||
# parents. Even with that, there are multiple places in plugin code where we
|
||||
# mix access when using models directly; those occurrences would need to be
|
||||
# fixed too to be able to remove this hook and explicit expire() calls.
|
||||
#
|
||||
# Adopted from the following recipe:
|
||||
# https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes
|
||||
# /ExpireRelationshipOnFKChange
|
||||
#
|
||||
# ...then massively changed to actually work for all neutron backref cases.
|
||||
#
|
||||
# TODO(ihrachys) at some point these event handlers should be extended to also
|
||||
# automatically refresh values for expired attributes
|
||||
def expire_for_fk_change(target, fk_value, relationship_prop, column_attr):
|
||||
"""Expire relationship attributes when a many-to-one column changes."""
|
||||
|
||||
sess = orm.object_session(target)
|
||||
|
||||
# subnets and network's many-to-one relationship is used as example in the
|
||||
# comments in this function
|
||||
if sess is not None:
|
||||
# optional behavior #1 - expire the "Network.subnets"
|
||||
# collection on the existing "network" object
|
||||
if relationship_prop.back_populates and \
|
||||
relationship_prop.key in target.__dict__:
|
||||
obj = getattr(target, relationship_prop.key)
|
||||
if obj is not None and sqlalchemy.inspect(obj).persistent:
|
||||
sess.expire(obj, [relationship_prop.back_populates])
|
||||
|
||||
# optional behavior #2 - expire the "Subnet.network"
|
||||
if sqlalchemy.inspect(target).persistent:
|
||||
sess.expire(target, [relationship_prop.key])
|
||||
|
||||
# optional behavior #3 - "trick" the ORM by actually
|
||||
# setting the value ahead of time, then emitting a load
|
||||
# for the attribute so that the *new* Subnet.network
|
||||
# is loaded. Then, expire Network.subnets on *that*.
|
||||
# Other techniques here including looking in the identity
|
||||
# map for "value", if this is a simple many-to-one get.
|
||||
if relationship_prop.back_populates:
|
||||
target.__dict__[column_attr] = fk_value
|
||||
new = getattr(target, relationship_prop.key)
|
||||
if new is not None:
|
||||
if sqlalchemy.inspect(new).persistent:
|
||||
sess.expire(new, [relationship_prop.back_populates])
|
||||
else:
|
||||
# no Session yet, do it later. This path is reached from the 'expire'
|
||||
# listener setup by '_expire_prop_on_col' below, when a foreign key
|
||||
# is directly assigned to in the many to one side of a relationship.
|
||||
# i.e. assigning directly to Subnet.network_id before Subnet is added
|
||||
# to the session
|
||||
if target not in _emit_on_pending:
|
||||
_emit_on_pending[target] = []
|
||||
_emit_on_pending[target].append(
|
||||
(fk_value, relationship_prop, column_attr))
|
||||
|
||||
|
||||
_emit_on_pending = weakref.WeakKeyDictionary()
|
||||
|
||||
|
||||
@event.listens_for(orm.session.Session, "pending_to_persistent")
|
||||
def _pending_callables(session, obj):
|
||||
"""Expire relationships when a new object w/ a foreign key becomes
|
||||
persistent
|
||||
"""
|
||||
if obj is None:
|
||||
return
|
||||
args = _emit_on_pending.pop(obj, [])
|
||||
for a in args:
|
||||
if a is not None:
|
||||
expire_for_fk_change(obj, *a)
|
||||
|
||||
|
||||
@event.listens_for(orm.session.Session, "persistent_to_deleted")
|
||||
def _persistent_to_deleted(session, obj):
|
||||
"""Expire relationships when an object w/ a foreign key becomes deleted"""
|
||||
mapper = sqlalchemy.inspect(obj).mapper
|
||||
for prop in mapper.relationships:
|
||||
if prop.direction is orm.interfaces.MANYTOONE:
|
||||
for col in prop.local_columns:
|
||||
colkey = mapper.get_property_by_column(col).key
|
||||
expire_for_fk_change(obj, None, prop, colkey)
|
||||
|
||||
|
||||
@event.listens_for(model_base.BASEV2, "attribute_instrument", propagate=True)
|
||||
def _listen_for_changes(cls, key, inst):
|
||||
mapper = sqlalchemy.inspect(cls)
|
||||
if key not in mapper.relationships:
|
||||
return
|
||||
prop = inst.property
|
||||
|
||||
if prop.direction is orm.interfaces.MANYTOONE:
|
||||
for col in prop.local_columns:
|
||||
colkey = mapper.get_property_by_column(col).key
|
||||
_expire_prop_on_col(cls, prop, colkey)
|
||||
elif prop.direction is orm.interfaces.ONETOMANY:
|
||||
remote_mapper = prop.mapper
|
||||
# the collection *has* to have a MANYTOONE backref so we
|
||||
# can look up the parent. so here we make one if it doesn't
|
||||
# have it already, as is the case in this example
|
||||
if not prop.back_populates:
|
||||
name = "_%s_backref" % prop.key
|
||||
backref_prop = orm.relationship(
|
||||
prop.parent, back_populates=prop.key)
|
||||
|
||||
remote_mapper.add_property(name, backref_prop)
|
||||
prop.back_populates = name
|
||||
|
||||
|
||||
def _expire_prop_on_col(cls, prop, colkey):
|
||||
@event.listens_for(getattr(cls, colkey), "set")
|
||||
def expire(target, value, oldvalue, initiator):
|
||||
"""Expire relationships when the foreign key attribute on
|
||||
an object changes
|
||||
"""
|
||||
expire_for_fk_change(target, value, prop, colkey)
|
||||
|
@ -277,9 +277,12 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
|
||||
page_reverse=False):
|
||||
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
|
||||
filters = filters or {}
|
||||
return subnet_obj.Subnet.get_objects(context, _pager=pager,
|
||||
validate_filters=False,
|
||||
**filters)
|
||||
# TODO(ihrachys) remove explicit reader usage when subnet OVO switches
|
||||
# to engine facade by default
|
||||
with db_api.context_manager.reader.using(context):
|
||||
return subnet_obj.Subnet.get_objects(context, _pager=pager,
|
||||
validate_filters=False,
|
||||
**filters)
|
||||
|
||||
def _make_network_dict(self, network, fields=None,
|
||||
process_extensions=True, context=None):
|
||||
|
@ -413,6 +413,17 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
||||
context.session.add(entry)
|
||||
elif not update_shared and entry:
|
||||
network.rbac_entries.remove(entry)
|
||||
|
||||
# TODO(ihrachys) Below can be removed when we make sqlalchemy
|
||||
# event listeners in neutron/db/api.py to refresh expired
|
||||
# attributes.
|
||||
#
|
||||
# First trigger expiration of rbac_entries.
|
||||
context.session.flush()
|
||||
# Then fetch state for _make_network_dict use outside session
|
||||
# context.
|
||||
getattr(network, 'rbac_entries')
|
||||
|
||||
# The filter call removes attributes from the body received from
|
||||
# the API that are logically tied to network resources but are
|
||||
# stored in other database tables handled by extensions
|
||||
@ -798,6 +809,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
||||
network,
|
||||
subnet['subnet'],
|
||||
subnetpool_id)
|
||||
# TODO(ihrachys): make sqlalchemy refresh expired relationships
|
||||
getattr(network, 'subnets')
|
||||
result = self._make_subnet_dict(subnet, context=context)
|
||||
return result, network, ipam_subnet
|
||||
|
||||
|
@ -1393,11 +1393,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
||||
floatingip_obj,
|
||||
fip,
|
||||
old_floatingip)
|
||||
# Expunge it to ensure the following get_object doesn't use the
|
||||
# instance. Such as update fip qos above bumps the revision of the
|
||||
# floatingIp. It would add floatingIp object to the session.
|
||||
context.session.expunge(model_query.get_by_id(
|
||||
context, l3_models.FloatingIP, floatingip_obj.id))
|
||||
floatingip_obj = l3_obj.FloatingIP.get_object(
|
||||
context, id=floatingip_obj.id)
|
||||
floatingip_db = floatingip_obj.db_obj
|
||||
|
@ -90,11 +90,14 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
|
||||
except c_exc.CallbackFailure as ex:
|
||||
raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'],
|
||||
details=ex)
|
||||
# make a dict copy because deleting the entry will nullify its
|
||||
# object_id link to network
|
||||
entry_dict = dict(entry)
|
||||
with context.session.begin(subtransactions=True):
|
||||
context.session.delete(entry)
|
||||
registry.notify(resources.RBAC_POLICY, events.AFTER_DELETE, self,
|
||||
context=context, object_type=object_type,
|
||||
policy=entry)
|
||||
policy=entry_dict)
|
||||
self.object_type_cache.pop(id, None)
|
||||
|
||||
def _get_rbac_policy(self, context, id):
|
||||
|
@ -121,12 +121,6 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
sg.obj_reset_changes(['rules'])
|
||||
|
||||
# fetch sg from db to load the sg rules with sg model.
|
||||
# NOTE(yamamoto): Adding rules above bumps the revision
|
||||
# of the SG. It would add SG object to the session.
|
||||
# Expunge it to ensure the following get_object doesn't
|
||||
# use the instance.
|
||||
context.session.expunge(model_query.get_by_id(
|
||||
context, sg_models.SecurityGroup, sg.id))
|
||||
sg = sg_obj.SecurityGroup.get_object(context, id=sg.id)
|
||||
secgroup_dict = self._make_security_group_dict(sg)
|
||||
kwargs['security_group'] = secgroup_dict
|
||||
|
@ -305,21 +305,11 @@ class NeutronObject(obj_base.VersionedObject,
|
||||
context, validate_filters=validate_filters, **kwargs))
|
||||
|
||||
|
||||
def _detach_db_obj(func):
|
||||
"""Decorator to detach db_obj from the session."""
|
||||
def _guarantee_rw_subtransaction(func):
|
||||
@functools.wraps(func)
|
||||
def decorator(self, *args, **kwargs):
|
||||
synthetic_changed = bool(self._get_changed_synthetic_fields())
|
||||
with self.db_context_writer(self.obj_context):
|
||||
res = func(self, *args, **kwargs)
|
||||
# some relationship based fields may be changed since we captured
|
||||
# the model, let's refresh it for the latest database state
|
||||
if synthetic_changed:
|
||||
# TODO(ihrachys) consider refreshing just changed attributes
|
||||
self.obj_context.session.refresh(self.db_obj)
|
||||
# detach the model so that consequent fetches don't reuse it
|
||||
self.obj_context.session.expunge(self.db_obj)
|
||||
return res
|
||||
return func(self, *args, **kwargs)
|
||||
return decorator
|
||||
|
||||
|
||||
@ -361,9 +351,8 @@ class DeclarativeObject(abc.ABCMeta):
|
||||
for key in model_unique_key]
|
||||
if obj_field_names.issuperset(obj_unique_key):
|
||||
cls.unique_keys.append(obj_unique_key)
|
||||
# detach db_obj right after object is loaded from the model
|
||||
cls.create = _detach_db_obj(cls.create)
|
||||
cls.update = _detach_db_obj(cls.update)
|
||||
cls.create = _guarantee_rw_subtransaction(cls.create)
|
||||
cls.update = _guarantee_rw_subtransaction(cls.update)
|
||||
|
||||
if (hasattr(cls, 'has_standard_attributes') and
|
||||
cls.has_standard_attributes()):
|
||||
@ -500,8 +489,6 @@ class NeutronDbObject(NeutronObject):
|
||||
def _load_object(cls, context, db_obj):
|
||||
obj = cls(context)
|
||||
obj.from_db_object(db_obj)
|
||||
# detach the model so that consequent fetches don't reuse it
|
||||
context.session.expunge(obj.db_obj)
|
||||
return obj
|
||||
|
||||
def obj_load_attr(self, attrname):
|
||||
@ -689,14 +676,6 @@ class NeutronDbObject(NeutronObject):
|
||||
del fields[field]
|
||||
return fields
|
||||
|
||||
def _get_changed_synthetic_fields(self):
|
||||
fields = self.obj_get_changes()
|
||||
fields = get_updatable_fields(self, fields)
|
||||
for field in self._get_changed_persistent_fields():
|
||||
if field in fields:
|
||||
del fields[field]
|
||||
return fields
|
||||
|
||||
def _validate_changed_fields(self, fields):
|
||||
fields = fields.copy()
|
||||
forbidden_updates = set(self.fields_no_update) & set(fields.keys())
|
||||
|
@ -159,7 +159,6 @@ class TestAgentsDbMixin(TestAgentsDbBase):
|
||||
mock.patch(
|
||||
'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
|
||||
).start()
|
||||
mock.patch.object(self.context.session, 'expunge').start()
|
||||
|
||||
with mock.patch('neutron.objects.db.api.create_object') as add_mock:
|
||||
add_mock.side_effect = [
|
||||
|
@ -482,6 +482,10 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
|
||||
flavor = {'flavor': {'name': 'Silver',
|
||||
'enabled': False}}
|
||||
self.plugin.update_flavor(self.ctx, fl['id'], flavor)
|
||||
|
||||
# don't reuse cached models from previous plugin call
|
||||
self.ctx.session.expire_all()
|
||||
|
||||
res = flavor_obj.Flavor.get_object(self.ctx, id=fl['id'])
|
||||
self.assertEqual('Silver', res['name'])
|
||||
self.assertFalse(res['enabled'])
|
||||
@ -560,6 +564,10 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
|
||||
data['service_profile']['metainfo'] = '{"data": "value1"}'
|
||||
sp = self.plugin.update_service_profile(self.ctx, sp['id'],
|
||||
data)
|
||||
|
||||
# don't reuse cached models from previous plugin call
|
||||
self.ctx.session.expire_all()
|
||||
|
||||
res = flavor_obj.ServiceProfile.get_object(self.ctx, id=sp['id'])
|
||||
self.assertEqual(data['service_profile']['metainfo'], res['metainfo'])
|
||||
|
||||
@ -591,6 +599,9 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
|
||||
self.assertEqual(fl['id'], binding['flavor_id'])
|
||||
self.assertEqual(sp['id'], binding['service_profile_id'])
|
||||
|
||||
# don't reuse cached models from previous plugin call
|
||||
self.ctx.session.expire_all()
|
||||
|
||||
res = self.plugin.get_flavor(self.ctx, fl['id'])
|
||||
self.assertEqual(1, len(res['service_profiles']))
|
||||
self.assertEqual(sp['id'], res['service_profiles'][0])
|
||||
|
@ -713,27 +713,6 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
|
||||
self.model_map = collections.defaultdict(list)
|
||||
self.model_map[self._test_class.db_model] = self.db_objs
|
||||
self.pager_map = collections.defaultdict(lambda: None)
|
||||
# don't validate refresh and expunge in tests that don't touch database
|
||||
# because otherwise it will fail due to db models not being injected
|
||||
# into active session in the first place
|
||||
mock.patch.object(self.context.session, 'refresh').start()
|
||||
mock.patch.object(self.context.session, 'expunge').start()
|
||||
|
||||
# don't validate expunge in tests that don't touch database and use
|
||||
# new reader engine facade
|
||||
self.reader_facade_mock = mock.patch.object(
|
||||
self._test_class, 'db_context_reader').start()
|
||||
mock.patch.object(self.reader_facade_mock.return_value.session,
|
||||
'expunge').start()
|
||||
|
||||
# don't validate refresh and expunge in tests that don't touch database
|
||||
# and use new writer engine facade
|
||||
self.writer_facade_mock = mock.patch.object(
|
||||
self._test_class, 'db_context_writer').start()
|
||||
mock.patch.object(self.writer_facade_mock.return_value.session,
|
||||
'expunge').start()
|
||||
mock.patch.object(self.writer_facade_mock.return_value.session,
|
||||
'refresh').start()
|
||||
|
||||
self.get_objects_mock = mock.patch.object(
|
||||
obj_db_api, 'get_objects',
|
||||
|
@ -59,8 +59,6 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
||||
'LoggingServiceDriverManager.supported_logging_types',
|
||||
new_callable=log_types).start()
|
||||
self.ctxt = context.Context('admin', 'fake_tenant')
|
||||
mock.patch.object(self.ctxt.session, 'refresh').start()
|
||||
mock.patch.object(self.ctxt.session, 'expunge').start()
|
||||
|
||||
def test_get_logs(self):
|
||||
with mock.patch.object(log_object.Log, 'get_objects')\
|
||||
|
@ -69,8 +69,6 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
|
||||
self.ctxt = context.Context('fake_user', 'fake_tenant')
|
||||
self.admin_ctxt = context.get_admin_context()
|
||||
mock.patch.object(self.ctxt.session, 'refresh').start()
|
||||
mock.patch.object(self.ctxt.session, 'expunge').start()
|
||||
|
||||
self.policy_data = {
|
||||
'policy': {'id': uuidutils.generate_uuid(),
|
||||
|
@ -18,7 +18,7 @@ neutron-lib>=1.13.0 # Apache-2.0
|
||||
python-neutronclient>=6.7.0 # Apache-2.0
|
||||
tenacity>=3.2.1 # Apache-2.0
|
||||
ryu>=4.24 # Apache-2.0
|
||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.2.0 # MIT
|
||||
WebOb>=1.7.1 # MIT
|
||||
keystoneauth1>=3.4.0 # Apache-2.0
|
||||
alembic>=0.8.10 # MIT
|
||||
|
Loading…
Reference in New Issue
Block a user