Merge "Fix relationship event handler for flushes and nested"
This commit is contained in:
commit
6ca8a186ae
|
@ -15,6 +15,7 @@
|
|||
|
||||
import contextlib
|
||||
import copy
|
||||
import weakref
|
||||
|
||||
from debtcollector import removals
|
||||
from neutron_lib.db import api
|
||||
|
@ -264,12 +265,25 @@ def sqla_remove_all():
|
|||
del _REGISTERED_SQLA_EVENTS[:]
|
||||
|
||||
|
||||
@event.listens_for(orm.session.Session, "after_flush")
|
||||
def add_to_rel_load_list(session, flush_context=None):
|
||||
# keep track of new items to load relationships on during commit
|
||||
session.info.setdefault('_load_rels', weakref.WeakSet()).update(
|
||||
session.new)
|
||||
|
||||
|
||||
@event.listens_for(orm.session.Session, "before_commit")
|
||||
def load_one_to_manys(session):
|
||||
# TODO(kevinbenton): we should be able to remove this after we
|
||||
# have eliminated all places where related objects are constructed
|
||||
# using a key rather than a relationship.
|
||||
for new_object in session.new:
|
||||
|
||||
add_to_rel_load_list(session) # capture any new objects
|
||||
if session.transaction.nested:
|
||||
# wait until final commit
|
||||
return
|
||||
|
||||
for new_object in session.info.pop('_load_rels', []):
|
||||
state = sqlalchemy.inspect(new_object)
|
||||
|
||||
# set up relationship loading so that we can call lazy
|
||||
|
|
|
@ -55,6 +55,7 @@ from neutron.db import ipam_backend_mixin
|
|||
from neutron.db.models import l3 as l3_models
|
||||
from neutron.db.models import securitygroup as sg_models
|
||||
from neutron.db import models_v2
|
||||
from neutron.db import rbac_db_models
|
||||
from neutron.db import standard_attr
|
||||
from neutron.ipam import exceptions as ipam_exc
|
||||
from neutron.tests import base
|
||||
|
@ -6062,13 +6063,29 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase):
|
|||
class DbModelMixin(object):
|
||||
"""DB model tests."""
|
||||
def test_make_network_dict_outside_engine_facade_manager(self):
|
||||
mock.patch.object(directory, 'get_plugin').start()
|
||||
ctx = context.get_admin_context()
|
||||
with db_api.context_manager.writer.using(ctx):
|
||||
network = models_v2.Network(name="net_net", status="OK",
|
||||
admin_state_up=True)
|
||||
ctx.session.add(network)
|
||||
with db_api.autonested_transaction(ctx.session):
|
||||
sg = sg_models.SecurityGroup(name='sg', description='sg')
|
||||
ctx.session.add(sg)
|
||||
# ensure db rels aren't loaded until commit for network object
|
||||
# by sharing after a nested transaction
|
||||
ctx.session.add(
|
||||
rbac_db_models.NetworkRBAC(object_id=network.id,
|
||||
action='access_as_shared',
|
||||
tenant_id=network.tenant_id,
|
||||
target_tenant='*')
|
||||
)
|
||||
net2 = models_v2.Network(name="net_net2", status="OK",
|
||||
admin_state_up=True)
|
||||
ctx.session.add(net2)
|
||||
pl = db_base_plugin_common.DbBasePluginCommon()
|
||||
self.assertFalse(pl._make_network_dict(network, context=ctx)['shared'])
|
||||
self.assertTrue(pl._make_network_dict(network, context=ctx)['shared'])
|
||||
self.assertFalse(pl._make_network_dict(net2, context=ctx)['shared'])
|
||||
|
||||
def test_repr(self):
|
||||
"""testing the string representation of 'model' classes."""
|
||||
|
|
|
@ -237,6 +237,7 @@ class TestL3GwModeMixin(testlib_api.SqlTestCase):
|
|||
self.context.session.add(self.fip_int_ip_info)
|
||||
self.context.session.add(self.fip)
|
||||
self.context.session.flush()
|
||||
self.context.session.expire_all()
|
||||
self.fip_request = {'port_id': FAKE_FIP_INT_PORT_ID,
|
||||
'tenant_id': self.tenant_id}
|
||||
|
||||
|
|
Loading…
Reference in New Issue