use context manager from neutron-lib

Change-Id: Iee1dc60862b12d48104325d2798fb881c5edcc9d
(cherry picked from commit d0d6094bee)
This commit is contained in:
Pulkit vajpayee 2023-09-15 04:03:36 +00:00 committed by pulkit vajpayee
parent 042faad003
commit a8df8df012
16 changed files with 1372 additions and 1061 deletions

View File

@ -445,7 +445,13 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
def _get_l3_policy(self, context, l3_policy_id):
try:
return db_api.get_by_id(context, L3Policy, l3_policy_id)
l3p_db = db_api.get_by_id(context, L3Policy, l3_policy_id)
# REVISIT: l3p_db is a db variable. Can't use it outside the
# session becuase at some places it is throwing an error that
# l3p_db should be bound to a session. Hence, reassigning.
for l3p in l3p_db:
l3p_db[l3p[0]] = l3p[1]
return l3p_db
except exc.NoResultFound:
raise gpolicy.L3PolicyNotFound(l3_policy_id=l3_policy_id)
@ -529,7 +535,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
if not action_id_list:
pr_db.policy_actions = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# We will first check if the new list of actions is valid
filters = {'id': [a_id for a_id in action_id_list]}
actions_in_db = db_api.get_collection_query(context, PolicyAction,
@ -557,7 +563,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
def _validate_policy_rule_set_list(self, context,
policy_rule_sets_id_list):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
filters = {'id': [c_id for c_id in policy_rule_sets_id_list]}
policy_rule_sets_in_db = db_api.get_collection_query(
context, PolicyRuleSet, filters=filters)
@ -599,7 +605,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
else:
db_res.consumed_policy_rule_sets = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
policy_rule_sets_id_list = list(policy_rule_sets_dict.keys())
# We will first check if the new list of policy_rule_sets is valid
self._validate_policy_rule_set_list(
@ -630,7 +636,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
# Only one hierarchy level allowed for now
raise gpolicy.ThreeLevelPolicyRuleSetHierarchyNotSupported(
policy_rule_set_id=policy_rule_set_db['id'])
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# We will first check if the new list of policy_rule_sets is valid
policy_rule_sets_in_db = self._validate_policy_rule_set_list(
@ -657,7 +663,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
if not rule_id_list:
prs_db.policy_rules = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# We will first check if the new list of rules is valid
filters = {'id': [r_id for r_id in rule_id_list]}
rules_in_db = db_api.get_collection_query(context, PolicyRule,
@ -702,24 +708,24 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
return res
def _set_l3_policy_for_l2_policy(self, context, l2p_id, l3p_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l2p_db = self._get_l2_policy(context, l2p_id)
l2p_db.l3_policy_id = l3p_id
def _set_l2_policy_for_policy_target_group(self, context, ptg_id, l2p_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
ptg_db = self._get_policy_target_group(context, ptg_id)
ptg_db.l2_policy_id = l2p_id
def _set_application_policy_group_for_policy_target_group(
self, context, ptg_id, apg_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
ptg_db = self._get_policy_target_group(context, ptg_id)
ptg_db.application_policy_group_id = apg_id
def _set_network_service_policy_for_policy_target_group(
self, context, ptg_id, nsp_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
ptg_db = self._get_policy_target_group(context, ptg_id)
ptg_db.network_service_policy_id = nsp_id
@ -730,7 +736,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
if not params:
nsp_db.network_service_params = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
nsp_db.network_service_params = []
for param in params:
param_db = NetworkServiceParam(
@ -744,7 +750,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
if not es_id_list:
ep_db.external_segments = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
filters = {'id': es_id_list}
eps_in_db = db_api.get_collection_query(
context, ExternalSegment, filters=filters)
@ -776,7 +782,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
if not es_dict:
l3p_db.external_segments = []
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
# Validate ESs exist
es_set = set(es_dict.keys())
filters = {'id': es_set}
@ -934,7 +940,7 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase):
res['child_policy_rule_sets'] = [
child_prs['id'] for child_prs in prs['child_policy_rule_sets']]
else:
with ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(ctx):
filters = {'parent_id': [prs['id']]}
child_prs_in_db = db_api.get_collection_query(
ctx, PolicyRuleSet, filters=filters)

View File

@ -153,7 +153,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
def _get_subnetpools(self, id_list):
context = get_current_context().elevated()
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(context):
filters = {'id': id_list}
return db_api.get_collection_query(
context, models_v2.SubnetPool, filters=filters).all()
@ -196,12 +196,12 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return db_api.resource_fields(res, fields)
def _set_port_for_policy_target(self, context, pt_id, port_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
pt_db = self._get_policy_target(context, pt_id)
pt_db.port_id = port_id
def _add_subnet_to_policy_target_group(self, context, ptg_id, subnet_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
ptg_db = self._get_policy_target_group(context, ptg_id)
assoc = PTGToSubnetAssociation(policy_target_group_id=ptg_id,
subnet_id=subnet_id)
@ -209,14 +209,14 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return [subnet.subnet_id for subnet in ptg_db.subnets]
def _remove_subnets_from_policy_target_groups(self, context, subnet_ids):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
assocs = context.session.query(PTGToSubnetAssociation).filter(
PTGToSubnetAssociation.subnet_id.in_(subnet_ids)).all()
for assoc in assocs:
context.session.delete(assoc)
def _remove_subnets_from_policy_target_group(self, context, ptg_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
ptg_db = self._get_policy_target_group(context, ptg_id)
assocs = (context.session.query(PTGToSubnetAssociation).
filter_by(policy_target_group_id=ptg_id).all())
@ -226,13 +226,13 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return []
def _set_network_for_l2_policy(self, context, l2p_id, network_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l2p_db = self._get_l2_policy(context, l2p_id)
l2p_db.network_id = network_id
def _set_address_scope_for_l3_policy_by_id(
self, context, l3p_id, address_scope_id, ip_version=4):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
self._set_address_scope_for_l3_policy(
context, l3p_db, address_scope_id, ip_version)
@ -242,7 +242,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
if not address_scope_id:
return
# TODO(Sumit): address_scope_id validation
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
if ip_version == 4:
l3p_db.address_scope_v4_id = address_scope_id
else:
@ -250,7 +250,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
def _add_subnetpool_to_l3_policy_by_id(self, context, l3p_id,
subnetpool_id, ip_version=4):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
return self._add_subnetpool_to_l3_policy(
context, l3p_db, subnetpool_id, ip_version)
@ -258,7 +258,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
def _add_subnetpool_to_l3_policy(self, context, l3p_db,
subnetpool_id, ip_version=4):
# TODO(Sumit): subnetpool_id validation
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
if ip_version == 4:
assoc = L3PolicySubnetpoolV4Association(
l3_policy_id=l3p_db['id'], subnetpool_id=subnetpool_id)
@ -272,7 +272,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
def _add_subnetpools_to_l3_policy_by_id(self, context, l3p_id,
subnetpools, ip_version=4):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
self._add_subnetpools_to_l3_policy(
context, l3p_db, subnetpools, ip_version)
@ -285,7 +285,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
def _remove_subnetpool_from_l3_policy(self, context, l3p_id,
subnetpool_id, ip_version=4):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
if ip_version == 4:
assoc = (context.session.query(
@ -310,7 +310,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
# there is no PT present on a subnet which belongs to that subnetpool
if not subnetpools:
return
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
new_subnetpools = set(subnetpools)
if ip_version == 4:
@ -342,7 +342,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
context.session.delete(assoc)
def _add_router_to_l3_policy(self, context, l3p_id, router_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
assoc = L3PolicyRouterAssociation(l3_policy_id=l3p_id,
router_id=router_id)
@ -350,7 +350,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return [router.router_id for router in l3p_db.routers]
def _remove_router_from_l3_policy(self, context, l3p_id, router_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
assoc = (context.session.query(L3PolicyRouterAssociation).
filter_by(l3_policy_id=l3p_id, router_id=router_id).
@ -360,17 +360,17 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return [router.router_id for router in l3p_db.routers]
def _set_subnet_to_es(self, context, es_id, subnet_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
es_db = self._get_external_segment(context, es_id)
es_db.subnet_id = subnet_id
def _set_subnet_to_np(self, context, np_id, subnet_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
np_db = self._get_nat_pool(context, np_id)
np_db.subnet_id = subnet_id
def _update_ess_for_l3p(self, context, l3p_id, ess):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
l3p_db = self._get_l3_policy(context, l3p_id)
self._set_ess_for_l3p(context, l3p_db, ess)
@ -391,7 +391,7 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
return mapping['l3_policy_id']
def _set_db_np_subnet(self, context, nat_pool, subnet_id):
with context.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(context):
nat_pool['subnet_id'] = subnet_id
db_np = self._get_nat_pool(context, nat_pool['id'])
db_np.subnet_id = nat_pool['subnet_id']

View File

@ -31,6 +31,7 @@ from aim.api import resource as aim_resource
from aim import context as aim_context
from aim import utils as aim_utils
from alembic import util as alembic_util
from gbpservice.neutron.db import api as db_api
from neutron.db.migration.cli import CONF
from neutron.db.models import address_scope as as_db
from neutron.db.models import securitygroup as sg_models
@ -132,7 +133,7 @@ def do_apic_aim_persist_migration(session):
aim_ctx = aim_context.AimContext(session)
mapper = apic_mapper.APICNameMapper()
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
# Migrate address scopes.
scope_dbs = (session.query(as_db.AddressScope)
.options(lazyload('*')).all())
@ -269,7 +270,7 @@ def do_ap_name_change(session, conf=None):
aim_ctx = aim_context.AimContext(session)
system_id = cfg.apic_system_id
alembic_util.msg("APIC System ID: %s" % system_id)
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
net_dbs = session.query(models_v2.Network).options(lazyload('*')).all()
for net_db in net_dbs:
ext_db = _get_network_extn_db(session, net_db.id)
@ -361,7 +362,7 @@ def do_apic_aim_security_group_migration(session):
aim = aim_manager.AimManager()
aim_ctx = aim_context.AimContext(session)
mapper = apic_mapper.APICNameMapper()
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
# Migrate SG.
sg_dbs = (session.query(sg_models.SecurityGroup).
options(lazyload('*')).all())
@ -433,7 +434,7 @@ def do_sg_rule_remote_group_id_insertion(session):
aim = aim_manager.AimManager()
aim_ctx = aim_context.AimContext(session)
mapper = apic_mapper.APICNameMapper()
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
sg_rule_dbs = (session.query(sg_models.SecurityGroupRule).
options(lazyload('*')).all())
for sg_rule_db in sg_rule_dbs:
@ -463,7 +464,7 @@ def do_ha_ip_duplicate_entries_removal(session):
haip_ip = HAIPAddressToPortAssociation.c.ha_ip_address
haip_port_id = HAIPAddressToPortAssociation.c.port_id
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
port_and_haip_dbs = (session.query(models_v2.Port,
HAIPAddressToPortAssociation).join(
HAIPAddressToPortAssociation,
@ -495,7 +496,7 @@ def do_ha_ip_network_id_insertion(session):
haip_ip = HAIPAddressToPortAssociation.c.ha_ip_address
haip_port_id = HAIPAddressToPortAssociation.c.port_id
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(session):
haip_ip = HAIPAddressToPortAssociation.c.ha_ip_address
haip_port_id = HAIPAddressToPortAssociation.c.port_id
port_and_haip_dbs = (session.query(models_v2.Port,

View File

@ -329,20 +329,22 @@ class DbMixin(object):
# HAIPAddressToPortAssociation functions.
def _get_ha_ipaddress(self, ipaddress, network_id, session=None):
session = session or db_api.get_reader_session()
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssociation))
query += lambda q: q.filter_by(
ha_ip_address=sa.bindparam('ipaddress'),
network_id=sa.bindparam('network_id'))
return query(session).params(
return query(ctx.session).params(
ipaddress=ipaddress, network_id=network_id).first()
def get_port_for_ha_ipaddress(self, ipaddress, network_id,
session=None):
"""Returns the Neutron Port ID for the HA IP Addresss."""
session = session or db_api.get_reader_session()
ctx = n_context.get_admin_context()
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssociation))
query += lambda q: q.join(
@ -355,20 +357,30 @@ class DbMixin(object):
sa.bindparam('network_id'))
query += lambda q: q.filter(
models_v2.Port.network_id == sa.bindparam('network_id'))
if session:
port_ha_ip = query(session).params(
ipaddress=ipaddress, network_id=network_id).first()
else:
with db_api.CONTEXT_READER.using(ctx):
port_ha_ip = query(ctx.session).params(
ipaddress=ipaddress, network_id=network_id).first()
return port_ha_ip
def get_ha_ipaddresses_for_port(self, port_id, session=None):
"""Returns the HA IP Addressses associated with a Port."""
session = session or db_api.get_reader_session()
ctx = n_context.get_admin_context()
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssociation))
query += lambda q: q.filter_by(
port_id=sa.bindparam('port_id'))
if session:
objs = query(session).params(
port_id=port_id).all()
else:
with db_api.CONTEXT_READER.using(ctx):
objs = query(ctx.session).params(
port_id=port_id).all()
# REVISIT: Do the sorting in the UT?
return sorted([x['ha_ip_address'] for x in objs])
@ -376,9 +388,9 @@ class DbMixin(object):
def update_port_id_for_ha_ipaddress(self, port_id, ipaddress, network_id,
session=None):
"""Stores a Neutron Port Id as owner of HA IP Addr (idempotent API)."""
session = session or db_api.get_writer_session()
ctx = session or n_context.get_admin_context()
try:
with session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(ctx) as session:
obj = self._get_ha_ipaddress(ipaddress, network_id, session)
if obj:
haip_ip = HAIPAddressToPortAssociation.ha_ip_address
@ -401,8 +413,8 @@ class DbMixin(object):
def delete_port_id_for_ha_ipaddress(self, port_id, ipaddress,
session=None):
session = session or db_api.get_writer_session()
with session.begin(subtransactions=True):
ctx = session or n_context.get_admin_context()
with db_api.CONTEXT_WRITER.using(ctx) as session:
try:
# REVISIT: Can this query be baked? The
# sqlalchemy.ext.baked.Result class does not have a
@ -418,11 +430,12 @@ class DbMixin(object):
# REVISIT: This method is only called from unit tests.
def get_ha_port_associations(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
query = BAKERY(lambda s: s.query(
HAIPAddressToPortAssociation))
return query(session).all()
return query(ctx.session).all()
# REVISIT: Move this method to the mechanism_driver or rpc module,
# as it is above the DB level. This will also require some rework
@ -446,17 +459,17 @@ class DbMixin(object):
if not ipa:
continue
try:
session = db_api.get_writer_session()
with session.begin(subtransactions=True):
ctx = n_context.get_admin_context()
with db_api.CONTEXT_WRITER.using(ctx):
net_id = port['network_id']
old_owner = self.get_port_for_ha_ipaddress(
ipa, network_id or net_id,
session=session)
session=ctx.session)
old_owner_port_id = None
if old_owner:
old_owner_port_id = old_owner['port_id']
self.update_port_id_for_ha_ipaddress(
port_id, ipa, net_id, session)
port_id, ipa, net_id, ctx.session)
if old_owner_port_id and old_owner_port_id != port_id:
ports_to_update.add(old_owner_port_id)
except db_exc.DBReferenceError as dbe:

View File

@ -56,7 +56,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_constants
from neutron_lib import context as nctx
from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exceptions
from neutron_lib.plugins import constants as pconst
from neutron_lib.plugins import directory
@ -186,9 +186,13 @@ class KeystoneNotificationEndpoint(object):
# we only update tenants which have been created in APIC. For other
# cases, their nameAlias will be set when the first resource is
# being created under that tenant
session = db_api.get_writer_session()
tenant_aname = self._driver.name_mapper.project(session, tenant_id)
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with db_api.CONTEXT_WRITER.using(ctx):
tenant_aname = self._driver.name_mapper.project(ctx.session,
tenant_id)
aim_ctx = aim_context.AimContext(ctx.session)
tenant = aim_resource.Tenant(name=tenant_aname)
if not self._driver.aim.get(aim_ctx, tenant):
return None
@ -210,9 +214,13 @@ class KeystoneNotificationEndpoint(object):
self._driver.project_details_cache.purge_gbp(tenant_id)
# delete the tenant and AP in AIM also
session = db_api.get_writer_session()
tenant_aname = self._driver.name_mapper.project(session, tenant_id)
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with db_api.CONTEXT_WRITER.using(ctx):
tenant_aname = self._driver.name_mapper.project(ctx.session,
tenant_id)
aim_ctx = aim_context.AimContext(ctx.session)
tenant = aim_resource.Tenant(name=tenant_aname)
self._driver.aim.delete(aim_ctx, tenant, cascade=True)
@ -303,7 +311,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
def _update_nova_vm_name_cache(self):
current_time = datetime.now()
context = nctx.get_admin_context()
context = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(context) as session:
vm_name_update = self._get_vm_name_update(session)
is_full_update = True
@ -451,8 +459,10 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
@db_api.retry_db_errors
def _ensure_static_resources(self):
session = db_api.get_writer_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM writer context once API supports it.
with db_api.CONTEXT_WRITER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
self._ensure_common_tenant(aim_ctx)
self._ensure_unrouted_vrf(aim_ctx)
self._ensure_any_filter(aim_ctx)
@ -2927,7 +2937,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
def update_summary_resource(self, session, resources, network_id):
for resource in resources:
if type(resource) == aim_resource.SpanVepgSummary:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
query = BAKERY(lambda s: s.query(
models_v2.Network))
query += lambda q: q.filter_by(
@ -3075,7 +3085,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
raise exceptions.InvalidPortForErspanSession()
# Not supported on SVI networks
ctx = nctx.get_admin_context()
ctx = n_context.get_admin_context()
net_db = self.plugin._get_network(ctx, port['network_id'])
if self._is_svi_db(net_db):
raise exceptions.InvalidNetworkForErspanSession()
@ -3234,7 +3244,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
new_subnets = list(set(curr) - set(orig))
if not new_subnets:
return
ctx = nctx.get_admin_context()
ctx = n_context.get_admin_context()
net_db = self.plugin._get_network(ctx, original_port['network_id'])
if self._is_svi_db(net_db):
self._update_svi_ports_for_added_subnet(ctx, new_subnets,
@ -3471,6 +3481,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
context._plugin_context, router_db, context.current, subnets)
def _delete_router_port(self, session, port_id):
with db_api.CONTEXT_WRITER.using(session):
query = BAKERY(lambda s: s.query(
l3_db.RouterPort))
query += lambda q: q.filter_by(
@ -4013,7 +4024,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if not self._dvs_notifier:
self._dvs_notifier = importutils.import_object(
DVS_AGENT_KLASS,
nctx.get_admin_context_without_session()
n_context.get_admin_context_without_session()
)
return self._dvs_notifier
@ -5004,11 +5015,13 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
ports_to_notify = [port_id]
fixed_ips = [x['ip_address'] for x in port['fixed_ips']]
if fixed_ips:
with db_api.CONTEXT_READER.using(plugin_context):
query = BAKERY(lambda s: s.query(
n_addr_pair_db.AllowedAddressPair))
query += lambda q: q.join(
models_v2.Port,
models_v2.Port.id == n_addr_pair_db.AllowedAddressPair.port_id)
models_v2.Port.id == (
n_addr_pair_db.AllowedAddressPair.port_id))
query += lambda q: q.filter(
models_v2.Port.network_id == sa.bindparam('network_id'))
addr_pair = query(plugin_context.session).params(

View File

@ -187,7 +187,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
# current context, which should be available in
# the session.info's dictionary, with a key of
# 'using_context').
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_network_dict(
session, netdb, result)
else:
@ -202,7 +202,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin = directory.get_plugin()
session = db_api.get_session_from_obj(netdb)
if session and session.is_active:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_network_dict_bulk(session,
results)
else:
@ -221,7 +221,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
# current context, which should be available in
# the session.info's dictionary, with a key of
# 'using_context').
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_port_dict(
session, portdb, result)
else:
@ -236,7 +236,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin = directory.get_plugin()
session = db_api.get_session_from_obj(portdb)
if session and session.is_active:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_port_dict_bulk(session,
results)
else:
@ -270,7 +270,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin = directory.get_plugin()
session = db_api.get_session_from_obj(subnetdb)
if session and session.is_active:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_subnet_dict_bulk(session,
results)
else:
@ -304,7 +304,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin = directory.get_plugin()
session = db_api.get_session_from_obj(subnetpooldb)
if session and session.is_active:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_subnetpool_dict_bulk(session,
results)
else:
@ -324,7 +324,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
# current context, which should be available in
# the session.info's dictionary, with a key of
# 'using_context').
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_address_scope_dict(
session, address_scope, result)
else:
@ -339,7 +339,7 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
plugin = directory.get_plugin()
session = db_api.get_session_from_obj(address_scope)
if session and session.is_active:
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
plugin.extension_manager.extend_address_scope_dict_bulk(
session, results)
else:

View File

@ -175,6 +175,7 @@ class ApicL3Plugin(extraroute_db.ExtraRoute_db_mixin,
# Overwrite the upstream implementation to take advantage
# of the bulk extension support.
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):

View File

@ -373,6 +373,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
def create_l2_policy_postcommit(self, context):
if not context.current['l3_policy_id']:
self._use_implicit_l3_policy(context)
with db_api.CONTEXT_READER.using(context._plugin_context):
l3p_db = context._plugin._get_l3_policy(
context._plugin_context, context.current['l3_policy_id'])
if not context.current['network_id']:
@ -433,6 +434,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
def delete_l2_policy_postcommit(self, context):
auto_ptg_id = self._get_auto_ptg_id(context.current['id'])
try:
with db_api.CONTEXT_WRITER.using(context._plugin_context):
auto_ptg = context._plugin._get_policy_target_group(
context._plugin_context, auto_ptg_id)
l3p_db = context._plugin._get_l3_policy(
@ -632,6 +634,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
l2p_id = ptg['l2_policy_id']
if l2p_id:
with db_api.CONTEXT_READER.using(context):
l2p_db = context._plugin._get_l2_policy(
context._plugin_context, l2p_id)
if not l2p_db['policy_target_groups'] or (
@ -1748,6 +1751,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
"_process_contracts_for_default_epg(), create and "
"delete cannot be True at the same time")
raise
with db_api.CONTEXT_WRITER.using(context._plugin_context):
session = context._plugin_context.session
aim_ctx = aim_context.AimContext(session)

View File

@ -19,6 +19,7 @@ import sqlalchemy as sa
from gbpservice._i18n import _
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.db import api as db_api
from gbpservice.neutron.extensions import driver_proxy_group as pg_ext
from gbpservice.neutron.extensions import group_policy as gbp_ext
from gbpservice.neutron.services.grouppolicy import (
@ -175,7 +176,7 @@ class ImplicitPolicyBase(api.PolicyDriver, local_api.LocalAPI):
session.add(owned)
def _l2_policy_is_owned(self, session, l2p_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedL2Policy).
filter_by(l2_policy_id=l2p_id).
first() is not None)
@ -186,20 +187,26 @@ class ImplicitPolicyBase(api.PolicyDriver, local_api.LocalAPI):
session.add(owned)
def _l3_policy_is_owned(self, session, l3p_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedL3Policy).
filter_by(l3_policy_id=l3p_id).
first() is not None)
def _cleanup_l3_policy(self, context, l3p_id):
if self._l3_policy_is_owned(context._plugin_context.session, l3p_id):
with db_api.CONTEXT_READER.using(context._plugin_context):
res = self._l3_policy_is_owned(context._plugin_context.session,
l3p_id)
if res:
# REVISIT(rkukura): Add check_unused parameter to
# local_api._delete_l3_policy()?
context._plugin.delete_l3_policy(context._plugin_context, l3p_id,
check_unused=True)
def _cleanup_l2_policy(self, context, l2p_id):
if self._l2_policy_is_owned(context._plugin_context.session, l2p_id):
with db_api.CONTEXT_READER.using(context._plugin_context):
res = self._l2_policy_is_owned(context._plugin_context.session,
l2p_id)
if res:
try:
self._delete_l2_policy(context._plugin_context, l2p_id)
except gbp_ext.L2PolicyInUse:

View File

@ -13,6 +13,7 @@
from neutron_lib.plugins import directory
from oslo_log import helpers as log
from gbpservice.neutron.db import api as db_api
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
from gbpservice.neutron.services.grouppolicy.drivers import (
implicit_policy as ipd)
@ -45,6 +46,7 @@ class CommonNeutronBase(ipd.ImplicitPolicyBase, rmd.OwnedResourcesOperations,
def create_l2_policy_postcommit(self, context):
if not context.current['l3_policy_id']:
self._use_implicit_l3_policy(context)
with db_api.CONTEXT_READER.using(context._plugin_context):
l3p_db = context._plugin._get_l3_policy(
context._plugin_context, context.current['l3_policy_id'])
if not context.current['network_id']:

View File

@ -166,7 +166,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _port_is_owned(self, session, port_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedPort).
filter_by(port_id=port_id).
first() is not None)
@ -177,7 +177,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _subnet_is_owned(self, session, subnet_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedSubnet).
filter_by(subnet_id=subnet_id).
first() is not None)
@ -188,7 +188,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _network_is_owned(self, session, network_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedNetwork).
filter_by(network_id=network_id).
first() is not None)
@ -199,7 +199,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _router_is_owned(self, session, router_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedRouter).
filter_by(router_id=router_id).
first() is not None)
@ -210,7 +210,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _address_scope_is_owned(self, session, address_scope_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedAddressScope).
filter_by(address_scope_id=address_scope_id).
first() is not None)
@ -221,7 +221,7 @@ class OwnedResourcesOperations(object):
session.add(owned)
def _subnetpool_is_owned(self, session, subnetpool_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(OwnedSubnetpool).
filter_by(subnetpool_id=subnetpool_id).
first() is not None)
@ -310,8 +310,10 @@ class ImplicitResourceOperations(local_api.LocalAPI,
return address_scope
def _cleanup_address_scope(self, plugin_context, address_scope_id):
if self._address_scope_is_owned(plugin_context.session,
address_scope_id):
with db_api.CONTEXT_READER.using(plugin_context):
res = self._address_scope_is_owned(plugin_context.session,
address_scope_id)
if res:
subpools = self._get_subnetpools(plugin_context,
filters={'address_scope_id':
[address_scope_id]})
@ -350,8 +352,10 @@ class ImplicitResourceOperations(local_api.LocalAPI,
ip_version=ip_version)
def _cleanup_subnetpool(self, plugin_context, subnetpool_id):
if self._subnetpool_is_owned(plugin_context.session,
subnetpool_id):
with db_api.CONTEXT_READER.using(plugin_context):
res = self._subnetpool_is_owned(plugin_context.session,
subnetpool_id)
if res:
subnets = self._get_subnets(plugin_context,
filters={'subnetpool_id':
[subnetpool_id]})
@ -382,7 +386,9 @@ class ImplicitResourceOperations(local_api.LocalAPI,
context.set_network_id(network['id'])
def _cleanup_network(self, plugin_context, network_id):
if self._network_is_owned(plugin_context.session, network_id):
with db_api.CONTEXT_READER.using(plugin_context.session) as session:
res = self._network_is_owned(session, network_id)
if res:
self._delete_network(plugin_context, network_id)
def _generate_subnets_from_cidrs(self, context, l2p, l3p, cidrs,
@ -433,6 +439,7 @@ class ImplicitResourceOperations(local_api.LocalAPI,
context, subnet_id)
def _get_l3p_allocated_subnets(self, context, l3p_id):
with db_api.CONTEXT_READER.using(context._plugin_context):
ptgs = context._plugin._get_l3p_ptgs(
context._plugin_context.elevated(), l3p_id)
return self._get_ptg_cidrs(context, None, ptg_dicts=ptgs)
@ -440,7 +447,7 @@ class ImplicitResourceOperations(local_api.LocalAPI,
def _validate_and_add_subnet(self, context, subnet, l3p_id):
subnet_id = subnet['id']
session = context._plugin_context.session
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
LOG.debug("starting validate_and_add_subnet transaction for "
"subnet %s", subnet_id)
ptgs = context._plugin._get_l3p_ptgs(
@ -619,7 +626,9 @@ class ImplicitResourceOperations(local_api.LocalAPI,
attrs.update(subnet_specifics)
subnet = self._create_subnet(context._plugin_context,
attrs)
self._mark_subnet_owned(context._plugin_context.session,
with db_api.CONTEXT_WRITER.using(context._plugin_context):
self._mark_subnet_owned(
context._plugin_context.session,
subnet['id'])
LOG.debug("Allocated subnet %(sub)s from subnetpool: "
"%(sp)s.", {'sub': subnet['id'],
@ -656,7 +665,10 @@ class ImplicitResourceOperations(local_api.LocalAPI,
except ext_l3.RouterInterfaceNotFoundForSubnet:
LOG.debug("Ignoring RouterInterfaceNotFoundForSubnet cleaning "
"up subnet: %s", subnet_id)
if self._subnet_is_owned(plugin_context.session, subnet_id):
with db_api.CONTEXT_READER.using(plugin_context):
res = self._subnet_is_owned(plugin_context.session, subnet_id)
if res:
self._delete_subnet(plugin_context, subnet_id)
def _get_default_security_group(self, plugin_context, ptg_id,
@ -729,7 +741,9 @@ class ImplicitResourceOperations(local_api.LocalAPI,
raise last
def _cleanup_port(self, plugin_context, port_id):
if self._port_is_owned(plugin_context.session, port_id):
with db_api.CONTEXT_READER.using(plugin_context):
res = self._port_is_owned(plugin_context.session, port_id)
if res:
try:
self._delete_port(plugin_context, port_id)
except n_exc.PortNotFound:
@ -770,7 +784,9 @@ class ImplicitResourceOperations(local_api.LocalAPI,
return router_id
def _cleanup_router(self, plugin_context, router_id):
if self._router_is_owned(plugin_context.session, router_id):
with db_api.CONTEXT_READER.using(plugin_context):
res = self._router_is_owned(plugin_context.session, router_id)
if res:
self._delete_router(plugin_context, router_id)
def _plug_router_to_subnet(self, plugin_context, subnet_id, router_id):
@ -919,8 +935,11 @@ class ImplicitResourceOperations(local_api.LocalAPI,
if (context.original['external_segment_id'] !=
context.current['external_segment_id']):
if context.original['subnet_id']:
if self._subnet_is_owned(context._plugin_context.session,
context.original['subnet_id']):
with db_api.CONTEXT_READER.using(context._plugin_context):
res = self._subnet_is_owned(
context._plugin_context.session,
context.original['subnet_id'])
if res:
self._delete_subnet(context._plugin_context,
context.original['subnet_id'])
if (context.current['external_segment_id'] and not
@ -980,8 +999,10 @@ class ImplicitResourceOperations(local_api.LocalAPI,
def _delete_subnet_on_nat_pool_delete(self, context):
if context.current['subnet_id']:
if self._subnet_is_owned(context._plugin_context.session,
context.current['subnet_id']):
with db_api.CONTEXT_READER.using(context._plugin_context):
res = self._subnet_is_owned(context._plugin_context.session,
context.current['subnet_id'])
if res:
self._delete_subnet(context._plugin_context,
context.current['subnet_id'])
@ -2679,7 +2700,7 @@ class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations,
@staticmethod
def _get_policy_rule_set_sg_mapping(session, policy_rule_set_id):
with session.begin(subtransactions=True):
with db_api.CONTEXT_READER.using(session):
return (session.query(PolicyRuleSetSGsMapping).
filter_by(policy_rule_set_id=policy_rule_set_id).one())

View File

@ -22,6 +22,8 @@ from neutron.tests import base
from neutron_lib import context
from sqlalchemy.orm import exc
from neutron_lib import context as n_context
from gbpservice.neutron.db import api as db_api
from gbpservice.nfp.common import constants as nfp_constants
from gbpservice.nfp.common import exceptions as nfp_exc
@ -75,7 +77,7 @@ class NFPDBTestCase(SqlTestCase):
super(NFPDBTestCase, self).setUp()
self.ctx = context.get_admin_context()
self.nfp_db = NFPDB()
self.session = db_api.get_writer_session()
self.session = n_context.get_admin_context().session
def create_network_function(self, attributes=None):
if attributes is None:
@ -90,7 +92,9 @@ class NFPDBTestCase(SqlTestCase):
'config_policy_id': 'config_policy_id',
'status': 'status'
}
return self.nfp_db.create_network_function(self.session, attributes)
with db_api.CONTEXT_WRITER.using(self.session):
return self.nfp_db.create_network_function(self.session,
attributes)
def test_create_network_function(self):
attrs = {
@ -140,6 +144,7 @@ class NFPDBTestCase(SqlTestCase):
'status': 'status'
}
network_function = self.create_network_function(attrs_all)
with db_api.CONTEXT_WRITER.using(self.session):
db_network_function = self.nfp_db.get_network_function(
self.session, network_function['id'])
for key in attrs_all:
@ -147,9 +152,11 @@ class NFPDBTestCase(SqlTestCase):
def test_list_network_function(self):
network_function = self.create_network_function()
with db_api.CONTEXT_WRITER.using(self.session):
network_functions = self.nfp_db.get_network_functions(self.session)
self.assertEqual(1, len(network_functions))
self.assertEqual(network_function['id'], network_functions[0]['id'])
self.assertEqual(network_function['id'],
network_functions[0]['id'])
def test_list_network_function_with_filters(self):
attrs = {
@ -161,10 +168,12 @@ class NFPDBTestCase(SqlTestCase):
}
network_function = self.create_network_function(attrs)
filters = {'service_id': ['service_id']}
with db_api.CONTEXT_READER.using(self.session):
network_functions = self.nfp_db.get_network_functions(
self.session, filters=filters)
self.assertEqual(1, len(network_functions))
self.assertEqual(network_function['id'], network_functions[0]['id'])
self.assertEqual(network_function['id'],
network_functions[0]['id'])
filters = {'service_id': ['nonexisting']}
network_functions = self.nfp_db.get_network_functions(
self.session, filters=filters)
@ -176,6 +185,7 @@ class NFPDBTestCase(SqlTestCase):
network_function = self.create_network_function()
self.assertIsNotNone(network_function['id'])
updated_network_function = {'status': 'ERROR'}
with db_api.CONTEXT_WRITER.using(self.session):
network_function = self.nfp_db.update_network_function(
self.session, network_function['id'], updated_network_function)
self.assertEqual('ERROR', network_function['status'])
@ -183,6 +193,7 @@ class NFPDBTestCase(SqlTestCase):
def test_delete_network_function(self):
network_function = self.create_network_function()
self.assertIsNotNone(network_function['id'])
with db_api.CONTEXT_WRITER.using(self.session):
self.nfp_db.delete_network_function(
self.session, network_function['id'])
self.assertRaises(nfp_exc.NetworkFunctionNotFound,
@ -213,6 +224,7 @@ class NFPDBTestCase(SqlTestCase):
],
'status': 'status'
}
with db_api.CONTEXT_WRITER.using(self.session):
return self.nfp_db.create_network_function_instance(
self.session, attributes)
@ -238,8 +250,10 @@ class NFPDBTestCase(SqlTestCase):
],
'status': 'status'
}
with db_api.CONTEXT_WRITER.using(self.session):
network_function_instance = (
self.nfp_db.create_network_function_instance(self.session, attrs))
self.nfp_db.create_network_function_instance(self.session,
attrs))
for key in attrs:
self.assertEqual(attrs[key], network_function_instance[key])
self.assertIsNotNone(network_function_instance['id'])
@ -253,6 +267,7 @@ class NFPDBTestCase(SqlTestCase):
'status': 'status',
'port_info': []
}
with db_api.CONTEXT_WRITER.using(self.session):
network_function_instance = (
self.nfp_db.create_network_function_instance(
self.session, attrs_mandatory))
@ -287,6 +302,7 @@ class NFPDBTestCase(SqlTestCase):
],
'status': 'status'
}
with db_api.CONTEXT_WRITER.using(self.session):
network_function_instance = (
self.nfp_db.create_network_function_instance(
self.session, attrs_all))
@ -294,15 +310,18 @@ class NFPDBTestCase(SqlTestCase):
self.nfp_db.get_network_function_instance(
self.session, network_function_instance['id']))
for key in attrs_all:
self.assertEqual(attrs_all[key], db_network_function_instance[key])
self.assertEqual(attrs_all[key],
db_network_function_instance[key])
def test_list_network_function_instance(self):
with db_api.CONTEXT_READER.using(self.session):
self.test_create_network_function_instance()
nf_instances = self.nfp_db.get_network_function_instances(
self.session)
self.assertEqual(1, len(nf_instances))
def test_list_network_function_instances_with_filters(self):
with db_api.CONTEXT_READER.using(self.session):
self.test_create_network_function_instance()
filters = {'ha_state': ['Active']}
nf_instances = self.nfp_db.get_network_function_instances(
@ -314,6 +333,7 @@ class NFPDBTestCase(SqlTestCase):
self.assertEqual([], nf_instances)
def test_update_network_function_instance(self):
with db_api.CONTEXT_WRITER.using(self.session):
network_function_instance = self.create_network_function_instance()
self.assertIsNotNone(network_function_instance['id'])
updated_nfi = {'status': 'ERROR'}
@ -325,6 +345,7 @@ class NFPDBTestCase(SqlTestCase):
network_function_instance = self.create_network_function_instance()
port_info = network_function_instance['port_info']
self.assertIsNotNone(network_function_instance['id'])
with db_api.CONTEXT_WRITER.using(self.session):
self.nfp_db.delete_network_function_instance(
self.session, network_function_instance['id'])
self.assertRaises(nfp_exc.NetworkFunctionInstanceNotFound,
@ -364,6 +385,7 @@ class NFPDBTestCase(SqlTestCase):
'port_role': nfp_constants.ACTIVE_PORT},
'status': 'status'
}
with db_api.CONTEXT_WRITER.using(self.session):
return self.nfp_db.create_network_function_device(
self.session, attributes)
@ -394,8 +416,10 @@ class NFPDBTestCase(SqlTestCase):
'port_role': nfp_constants.ACTIVE_PORT},
'status': 'status'
}
network_function_device = self.nfp_db.create_network_function_device(
self.session, attrs)
with db_api.CONTEXT_WRITER.using(self.session):
network_function_device = (
self.nfp_db.create_network_function_device(
self.session, attrs))
self.assertIn('gateway_port', network_function_device)
for key in attrs:
if (key == 'mgmt_port_id') or (key == 'monitoring_port_id'):
@ -416,6 +440,7 @@ class NFPDBTestCase(SqlTestCase):
'interfaces_in_use': 1,
'status': 'status'
}
with db_api.CONTEXT_WRITER.using(self.session):
nf_device = self.nfp_db.create_network_function_device(
self.session, attrs_mandatory)
for key in attrs_mandatory:
@ -454,10 +479,13 @@ class NFPDBTestCase(SqlTestCase):
'port_role': nfp_constants.ACTIVE_PORT},
'status': 'status'
}
network_function_device = self.nfp_db.create_network_function_device(
self.session, attrs)
db_network_function_device = self.nfp_db.get_network_function_device(
self.session, network_function_device['id'])
with db_api.CONTEXT_WRITER.using(self.session):
network_function_device = (
self.nfp_db.create_network_function_device(
self.session, attrs))
db_network_function_device = (
self.nfp_db.get_network_function_device(
self.session, network_function_device['id']))
for key in attrs:
if (key == 'mgmt_port_id') or (key == 'monitoring_port_id'):
self.assertEqual(attrs[key]['id'],
@ -466,20 +494,25 @@ class NFPDBTestCase(SqlTestCase):
self.assertEqual(attrs[key], db_network_function_device[key])
def test_list_network_function_device(self):
with db_api.CONTEXT_READER.using(self.session):
self.test_create_network_function_device()
network_function_devices = self.nfp_db.get_network_function_devices(
self.session)
network_function_devices = (
self.nfp_db.get_network_function_devices(
self.session))
self.assertEqual(1, len(network_function_devices))
def test_list_network_function_devices_with_filters(self):
with db_api.CONTEXT_READER.using(self.session):
self.test_create_network_function_device()
filters = {'service_vendor': ['service_vendor']}
network_function_devices = self.nfp_db.get_network_function_devices(
self.session, filters=filters)
network_function_devices = (
self.nfp_db.get_network_function_devices(
self.session, filters=filters))
self.assertEqual(1, len(network_function_devices))
filters = {'service_vendor': ['nonexisting']}
network_function_devices = self.nfp_db.get_network_function_devices(
self.session, filters=filters)
network_function_devices = (
self.nfp_db.get_network_function_devices(
self.session, filters=filters))
self.assertEqual([], network_function_devices)
def test_update_network_function_device(self):
@ -509,8 +542,10 @@ class NFPDBTestCase(SqlTestCase):
'port_role': nfp_constants.ACTIVE_PORT},
'status': 'status'
}
network_function_device = self.nfp_db.create_network_function_device(
self.session, attrs)
with db_api.CONTEXT_WRITER.using(self.session):
network_function_device = (
self.nfp_db.create_network_function_device(
self.session, attrs))
for key in attrs:
if (key == 'mgmt_port_id') or (key == 'monitoring_port_id'):
self.assertEqual(attrs[key]['id'],
@ -561,6 +596,7 @@ class NFPDBTestCase(SqlTestCase):
network_function_device = self.create_network_function_device()
mgmt_port_id = network_function_device['mgmt_port_id']
self.assertIsNotNone(network_function_device['id'])
with db_api.CONTEXT_WRITER.using(self.session):
self.nfp_db.delete_network_function_device(
self.session, network_function_device['id'])
self.assertRaises(nfp_exc.NetworkFunctionDeviceNotFound,
@ -582,6 +618,7 @@ class NFPDBTestCase(SqlTestCase):
)
def test_add_service_gateway_details(self):
with db_api.CONTEXT_WRITER.using(self.session):
gateway_details = self._get_gateway_details()
gateway = self.nfp_db.add_service_gateway_details(
self.session, gateway_details)
@ -597,6 +634,7 @@ class NFPDBTestCase(SqlTestCase):
self.assertIsNotNone(gateway['id'])
def test_get_gateway_detail(self):
with db_api.CONTEXT_WRITER.using(self.session):
gateway_details = self._get_gateway_details()
gateway = self.nfp_db.add_service_gateway_details(
self.session, gateway_details)
@ -607,6 +645,7 @@ class NFPDBTestCase(SqlTestCase):
(_gateway['id'], _gateway['network_function_id']))
def test_get_providers_for_gateway(self):
with db_api.CONTEXT_WRITER.using(self.session):
gateway_details = self._get_gateway_details()
gateway = self.nfp_db.add_service_gateway_details(
self.session, gateway_details)
@ -617,11 +656,13 @@ class NFPDBTestCase(SqlTestCase):
(_gateway['id'], _gateway['network_function_id']))
def test_delete_gateway(self):
with db_api.CONTEXT_WRITER.using(self.session):
gateway_details = self._get_gateway_details()
gateway = self.nfp_db.add_service_gateway_details(
self.session, gateway_details)
self.assertIsNotNone(gateway['id'])
self.nfp_db.delete_network_function(self.session, gateway_details[
'network_function_id'])
self.assertRaises(exc.NoResultFound, self.nfp_db.get_gateway_detail,
self.assertRaises(exc.NoResultFound,
self.nfp_db.get_gateway_detail,
self.session, gateway_details['network_function_id'])

View File

@ -335,7 +335,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.useFixture(AimSqlFixture())
super(ApicAimTestCase, self).setUp(PLUGIN_NAME,
service_plugins=service_plugins)
self.db_session = db_api.get_writer_session()
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM writer context once API supports it.
with db_api.CONTEXT_WRITER.using(ctx):
self.db_session = ctx.session
self.initialize_db_config(self.db_session)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
@ -392,6 +395,8 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
pd_api.VALIDATION_PASSED, self.validation_mgr.validate())
def _find_by_dn(self, dn, cls):
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
resource = cls.from_dn(dn)
return self.aim_mgr.get(aim_ctx, resource)
@ -490,8 +495,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
return self.deserialize(self.fmt, res)
def _get_sg(self, sg_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
sg = aim_resource.SecurityGroup(tenant_name=tenant_name,
name=sg_name)
sg = self.aim_mgr.get(aim_ctx, sg)
@ -499,15 +506,19 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
return sg
def _sg_should_not_exist(self, sg_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
sgs = self.aim_mgr.find(
aim_ctx, aim_resource.SecurityGroup, name=sg_name)
self.assertEqual([], sgs)
def _get_sg_subject(self, sg_subject_name, sg_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
sg_subject = aim_resource.SecurityGroupSubject(
tenant_name=tenant_name, security_group_name=sg_name,
name=sg_subject_name)
@ -517,8 +528,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
def _get_sg_rule(self, sg_rule_name, sg_subject_name, sg_name,
tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
sg_rule = aim_resource.SecurityGroupRule(
tenant_name=tenant_name, security_group_name=sg_name,
security_group_subject_name=sg_subject_name, name=sg_rule_name)
@ -527,8 +540,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
return sg_rule
def _get_contract(self, contract_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
contract = aim_resource.Contract(tenant_name=tenant_name,
name=contract_name)
contract = self.aim_mgr.get(aim_ctx, contract)
@ -536,8 +551,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
return contract
def _get_subject(self, subject_name, contract_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
# TODO(pulkit): replace with AIM reader context once API supports it.
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
subject = aim_resource.ContractSubject(tenant_name=tenant_name,
contract_name=contract_name,
name=subject_name)
@ -570,8 +587,9 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.assertFalse(dns)
def _sg_rule_should_not_exist(self, sg_rule_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
sg_rules = self.aim_mgr.find(
aim_ctx, aim_resource.SecurityGroupRule, name=sg_rule_name)
self.assertEqual([], sg_rules)
@ -645,8 +663,9 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
network['apic:distinguished_names']['EndpointGroup'])
def _get_tenant(self, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
tenant = aim_resource.Tenant(name=tenant_name)
tenant = self.aim_mgr.get(aim_ctx, tenant)
self.assertIsNotNone(tenant)
@ -886,8 +905,9 @@ class TestAimMapping(ApicAimTestCase):
super(TestAimMapping, self).tearDown()
def _get_vrf(self, vrf_name, tenant_name, should_exist=True):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
vrf = aim_resource.VRF(tenant_name=tenant_name,
name=vrf_name)
vrf = self.aim_mgr.get(aim_ctx, vrf)
@ -898,14 +918,16 @@ class TestAimMapping(ApicAimTestCase):
self.assertIsNone(vrf)
def _vrf_should_not_exist(self, vrf_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
vrfs = self.aim_mgr.find(aim_ctx, aim_resource.VRF, name=vrf_name)
self.assertEqual([], vrfs)
def _get_bd(self, bd_name, tenant_name, bd_dn=None):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
if bd_dn:
bd = aim_resource.BridgeDomain.from_dn(bd_dn)
else:
@ -916,22 +938,25 @@ class TestAimMapping(ApicAimTestCase):
return bd
def _bd_should_not_exist(self, bd_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
bds = self.aim_mgr.find(
aim_ctx, aim_resource.BridgeDomain, name=bd_name)
self.assertEqual([], bds)
def _l3out_should_not_exist(self, l3out_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
l3outs = self.aim_mgr.find(
aim_ctx, aim_resource.L3Outside, name=l3out_name)
self.assertEqual([], l3outs)
def _get_subnet(self, gw_ip_mask, bd_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
subnet = aim_resource.Subnet(tenant_name=tenant_name,
bd_name=bd_name,
gw_ip_mask=gw_ip_mask)
@ -940,8 +965,9 @@ class TestAimMapping(ApicAimTestCase):
return subnet
def _subnet_should_not_exist(self, gw_ip_mask, bd_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
subnets = self.aim_mgr.find(
aim_ctx, aim_resource.Subnet, bd_name=bd_name,
gw_ip_mask=gw_ip_mask)
@ -958,30 +984,34 @@ class TestAimMapping(ApicAimTestCase):
return epg
def _epg_should_not_exist(self, epg_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
epgs = self.aim_mgr.find(aim_ctx, aim_resource.EndpointGroup,
name=epg_name)
self.assertEqual([], epgs)
def _contract_should_not_exist(self, contract_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
contracts = self.aim_mgr.find(aim_ctx, aim_resource.Contract,
name=contract_name)
self.assertEqual([], contracts)
def _subject_should_not_exist(self, subject_name, contract_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
subjects = self.aim_mgr.find(
aim_ctx, aim_resource.ContractSubject,
subject_name=subject_name, name=contract_name)
self.assertEqual([], subjects)
def _get_filter(self, filter_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
filter = aim_resource.Filter(tenant_name=tenant_name,
name=filter_name)
filter = self.aim_mgr.get(aim_ctx, filter)
@ -989,8 +1019,9 @@ class TestAimMapping(ApicAimTestCase):
return filter
def _get_filter_entry(self, entry_name, filter_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
entry = aim_resource.FilterEntry(tenant_name=tenant_name,
filter_name=filter_name,
name=entry_name)
@ -999,8 +1030,9 @@ class TestAimMapping(ApicAimTestCase):
return entry
def _get_l3out(self, l3out_name, tenant_name):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
l3out = aim_resource.L3Outside(tenant_name=tenant_name,
name=l3out_name)
l3out = self.aim_mgr.get(aim_ctx, l3out)
@ -1008,8 +1040,9 @@ class TestAimMapping(ApicAimTestCase):
return l3out
def _get_l3out_ext_net(self, aim_ext_net):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
aim_ext_net = self.aim_mgr.get(aim_ctx, aim_ext_net)
self.assertIsNotNone(aim_ext_net)
return aim_ext_net
@ -1512,6 +1545,7 @@ class TestAimMapping(ApicAimTestCase):
self._delete('networks', net_id)
# Test create with valid pre-existing BD.
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
tenant_name = self.name_mapper.project(None, net['tenant_id'])
bd = aim_resource.BridgeDomain(tenant_name=tenant_name,
@ -1537,7 +1571,8 @@ class TestAimMapping(ApicAimTestCase):
'BridgeDomain': 'someotherbd'}}}
result = self._update('networks', net_id, data,
expected_code=webob.exc.HTTPBadRequest.code)
err_msg = 'Cannot update read-only attribute apic:distinguished_names'
err_msg = (
'Cannot update read-only attribute apic:distinguished_names')
self.assertEqual('HTTPBadRequest', result['NeutronError']['type'])
self.assertEqual(err_msg, result['NeutronError']['message'])
@ -1558,7 +1593,7 @@ class TestAimMapping(ApicAimTestCase):
self.assertIsNone(bd_deleted)
def test_svi_network_lifecycle(self):
session = db_api.get_writer_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# test create.
@ -1588,7 +1623,8 @@ class TestAimMapping(ApicAimTestCase):
# test delete
self._delete('networks', net['id'])
self.assertFalse(extn.get_network_extn_db(session, net['id']))
with db_api.CONTEXT_WRITER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net['id']))
self._check_network_deleted(net)
def _test_invalid_network_exceptions(self, kwargs):
@ -3648,6 +3684,7 @@ class TestAimMapping(ApicAimTestCase):
self._validate()
def test_address_scope_pre_existing_vrf(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(
@ -3675,6 +3712,7 @@ class TestAimMapping(ApicAimTestCase):
self.assertEqual('CTX1', vrf.display_name)
def test_network_in_address_scope_pre_existing_vrf(self, common_vrf=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if not common_vrf:
@ -3688,12 +3726,15 @@ class TestAimMapping(ApicAimTestCase):
name='ctx1', monitored=True)
vrf = self.aim_mgr.create(aim_ctx, vrf)
scope = self._make_address_scope_for_vrf(vrf.dn,
scope = self._make_address_scope_for_vrf(
vrf.dn,
name='as1')['address_scope']
pool = self._make_subnetpool(
self.fmt, ['10.0.0.0/8'], name='sp', address_scope_id=scope['id'],
tenant_id=scope['tenant_id'], default_prefixlen=24)['subnetpool']
self.fmt, ['10.0.0.0/8'], name='sp',
address_scope_id=scope['id'],
tenant_id=scope['tenant_id'],
default_prefixlen=24)['subnetpool']
net = self._make_network(self.fmt, 'net1', True)['network']
subnet = self._make_subnet(
@ -3703,7 +3744,8 @@ class TestAimMapping(ApicAimTestCase):
router = self._make_router(self.fmt, self._tenant_id,
'router1')['router']
self._router_interface_action('add', router['id'], subnet['id'], None)
self._router_interface_action('add', router['id'], subnet['id'],
None)
net = self._show('networks', net['id'])['network']
self._check_network(net, routers=[router], vrf=vrf)
@ -4125,6 +4167,7 @@ class TestSyncState(ApicAimTestCase):
def test_external_network(self):
ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1)
ext_net.monitored = True
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx, ext_net)
@ -4204,6 +4247,7 @@ class TestSyncState(ApicAimTestCase):
def test_external_subnet(self):
ext_net = aim_resource.ExternalNetwork.from_dn(self.dn_t1_l1_n1)
ext_net.monitored = True
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx, ext_net)
@ -4231,8 +4275,10 @@ class TestSyncState(ApicAimTestCase):
def _test_erspan_sync(self, expected_state, with_erspan=True):
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(
db_session=db_api.get_writer_session())
db_session=ctx.session)
self._register_agent('host1', AGENT_CONF_OPFLEX)
self._register_agent('host2', AGENT_CONF_OPFLEX)
# Host 1: VPC host
@ -4922,8 +4968,9 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
self.fmt, 4, name='as1')['address_scope']
scope1_id = scope['id']
scope1_vrf = scope[DN]['VRF']
mapping = self._get_address_scope_mapping(self.db_session, scope1_id)
with self.db_session.begin():
mapping = self._get_address_scope_mapping(self.db_session,
scope1_id)
self.db_session.delete(mapping)
# Create an address scope with pre-existing VRF, delete its
@ -4937,8 +4984,9 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
scope2_id = scope['id']
scope2_vrf = scope[DN]['VRF']
self.assertEqual(vrf.dn, scope2_vrf)
mapping = self._get_address_scope_mapping(self.db_session, scope2_id)
with self.db_session.begin():
mapping = self._get_address_scope_mapping(self.db_session,
scope2_id)
self.db_session.delete(mapping)
old_db = data_migrations.DefunctAddressScopeExtensionDb(
address_scope_id=scope2_id, vrf_dn=scope2_vrf)
@ -4951,8 +4999,8 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
net1_bd = net[DN]['BridgeDomain']
net1_epg = net[DN]['EndpointGroup']
net1_vrf = net[DN]['VRF']
mapping = self._get_network_mapping(self.db_session, net1_id)
with self.db_session.begin():
mapping = self._get_network_mapping(self.db_session, net1_id)
self.db_session.delete(mapping)
# Create an external network and delete its mapping.
@ -4961,8 +5009,8 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
net2_bd = net[DN]['BridgeDomain']
net2_epg = net[DN]['EndpointGroup']
net2_vrf = net[DN]['VRF']
mapping = self._get_network_mapping(self.db_session, net2_id)
with self.db_session.begin():
mapping = self._get_network_mapping(self.db_session, net2_id)
self.db_session.delete(mapping)
# Create an unmanaged external network and verify it has no
@ -5080,6 +5128,7 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
self._router_interface_action('add', router['id'], sub['id'], None)
aim = self.aim_mgr
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
ns = self.driver._nat_type_to_strategy(None)
ext_net = aim_resource.ExternalNetwork.from_dn(
@ -5152,6 +5201,7 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
'Resource: %s still in AIM' % r)
def test_security_group_migration_sanity(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self._make_network(self.fmt, 'net1', True)
sgs = self.aim_mgr.find(aim_ctx, aim_resource.SecurityGroup)
@ -5203,6 +5253,7 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
self._check_sg_rule(sg['id'], sg_rule)
# Wipe out the remote_group_id of all the sg_rules.
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
sg_rules = self.aim_mgr.find(aim_ctx, aim_resource.SecurityGroupRule)
for sg_rule in sg_rules:
@ -5257,6 +5308,7 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
'ip_address_v4': owned_addr[0],
'network_id': p2['network_id']}
self.update_ip_owner(ip_owner_info)
with db_api.CONTEXT_WRITER.using(self.db_session):
data_migrations.do_ha_ip_duplicate_entries_removal(self.db_session)
dump = self.get_ha_port_associations()
self.assertEqual(2, len(dump))
@ -5294,7 +5346,7 @@ class TestMigrations(ApicAimTestCase, db.DbMixin):
'ip_address_v4': owned_addr[0],
'network_id': p2['network_id']}
self.update_ip_owner(ip_owner_info)
with self.db_session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(self.db_session):
dump = self.get_ha_port_associations()
self.assertEqual(2, len(dump))
obj = self.get_port_for_ha_ipaddress(
@ -5474,6 +5526,7 @@ class TestPortBinding(ApicAimTestCase):
port['binding:vif_details'])
def test_dualstack_svi_opflex_agent(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hlink1 = aim_infra.HostLink(
@ -5577,6 +5630,7 @@ class TestPortBinding(ApicAimTestCase):
self._test_dualstack_svi_opflex_agent_add_subnet(vpc=True)
def _test_dualstack_svi_opflex_agent_add_subnet(self, vpc=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if vpc:
@ -5805,6 +5859,7 @@ class TestPortBinding(ApicAimTestCase):
def test_bind_opflex_agent_svi(self):
self._register_agent('host1', AGENT_CONF_OPFLEX)
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hlink_1 = aim_infra.HostLink(
host_name='host1',
@ -6099,6 +6154,7 @@ class TestPortBinding(ApicAimTestCase):
subports = []
baremetal_physnet = 'physnet1'
parent_physnet = subport_physnet = None
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
# All baremetal VNIC ports are bound to VLAN type segments. For
@ -6829,6 +6885,7 @@ class TestDomains(ApicAimTestCase):
# REVISIT: Does this test anything that is not covered by
# TestPortOnPhysicalNode.test_mixed_ports_on_network_with_default_domains?
def test_aim_epg_domains(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx,
aim_resource.VMMDomain(type='OpenStack',
@ -6897,7 +6954,7 @@ class TestMl2SubnetPoolsV2(test_plugin.TestSubnetPoolsV2,
class TestExtensionAttributes(ApicAimTestCase):
def test_bgp_enabled_network_lifecycle(self):
session = db_api.get_writer_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# Test create SVI network without BGP.
@ -6961,12 +7018,14 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net1['id'])
self.assertFalse(extn.get_network_extn_db(session, net1['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net1['id']))
self._delete('networks', net2['id'])
self.assertFalse(extn.get_network_extn_db(session, net2['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net2['id']))
def test_network_with_nested_domain_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
vlan_dict = {'vlans_list': ['2', '3', '4', '3'],
'vlan_ranges': [{'start': '6', 'end': '9'},
@ -7014,14 +7073,15 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net1['id'])
self.assertFalse(extn.get_network_extn_db(session, net1['id']))
db_vlans = (session.query(
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net1['id']))
db_vlans = (ctx.session.query(
extn_db.NetworkExtNestedDomainAllowedVlansDb).filter_by(
network_id=net1['id']).all())
self.assertEqual([], db_vlans)
def test_network_with_extra_contracts_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# Create network with extra contracts.
@ -7065,14 +7125,15 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net_id)
self.assertFalse(extn.get_network_extn_db(session, net_id))
db_contracts = (session.query(
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net_id))
db_contracts = (ctx.session.query(
extn_db.NetworkExtExtraContractDb).filter_by(
network_id=net_id).all())
self.assertEqual([], db_contracts)
def test_network_with_epg_contract_masters_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# Create network with EPG contract masters
@ -7113,14 +7174,15 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net_id)
self.assertFalse(extn.get_network_extn_db(session, net_id))
db_masters = (session.query(
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net_id))
db_masters = (ctx.session.query(
extn_db.NetworkExtEpgContractMasterDb).filter_by(
network_id=net_id).all())
self.assertEqual([], db_masters)
def test_network_with_policy_enforcement_pref_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# Create network with default Policy Enforcement Pref
@ -7156,14 +7218,15 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net_id)
self.assertFalse(extn.get_network_extn_db(session, net_id))
db_masters = (session.query(
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net_id))
db_masters = (ctx.session.query(
extn_db.NetworkExtEpgContractMasterDb).filter_by(
network_id=net_id).all())
self.assertEqual([], db_masters)
def test_network_with_no_nat_cidrs_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# Create network with default no nat cidrs extension
@ -7199,14 +7262,15 @@ class TestExtensionAttributes(ApicAimTestCase):
# Test delete.
self._delete('networks', net_id)
self.assertFalse(extn.get_network_extn_db(session, net_id))
db_masters = (session.query(
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net_id))
db_masters = (ctx.session.query(
extn_db.NetworkExtensionNoNatCidrsDb).filter_by(
network_id=net_id).all())
self.assertEqual([], db_masters)
def test_external_network_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# create with APIC DN, nat_typeand default CIDR
@ -7288,11 +7352,12 @@ class TestExtensionAttributes(ApicAimTestCase):
self._delete('networks', net2['id'])
self._delete('networks', net1['id'])
self.assertFalse(extn.get_network_extn_db(session, net1['id']))
self.assertFalse(extn.get_network_extn_db(session, net2['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net1['id']))
self.assertFalse(extn.get_network_extn_db(ctx.session, net2['id']))
def test_external_network_with_multi_nets_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# create with APIC DN, nat_typeand default CIDR
@ -7376,8 +7441,9 @@ class TestExtensionAttributes(ApicAimTestCase):
self._delete('networks', net2['id'])
self._delete('networks', net1['id'])
self.assertFalse(extn.get_network_extn_db(session, net1['id']))
self.assertFalse(extn.get_network_extn_db(session, net2['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_network_extn_db(ctx.session, net1['id']))
self.assertFalse(extn.get_network_extn_db(ctx.session, net2['id']))
def test_external_network_fail(self):
# APIC DN not specified
@ -7455,7 +7521,7 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertTrue(subnet1[EPG_SUBNET])
def test_external_subnet_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
net1 = self._make_ext_network('net1',
@ -7494,7 +7560,9 @@ class TestExtensionAttributes(ApicAimTestCase):
# delete subnet
self._delete('subnets', subnet['id'])
self.assertFalse(extn.get_subnet_extn_db(session, subnet['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_subnet_extn_db(ctx.session,
subnet['id']))
# Simulate a prior existing subnet (i.e. no extension attrs exist)
# Get should give default value, and updates should stick
@ -7502,11 +7570,11 @@ class TestExtensionAttributes(ApicAimTestCase):
self.fmt, {'network': net1}, '20.0.0.1', '20.0.0.0/24')['subnet']
self._update('subnets', subnet2['id'],
{'subnet': {SNAT_POOL: True}})
with session.begin(subtransactions=True):
db_obj = session.query(extn_db.SubnetExtensionDb).filter(
with db_api.CONTEXT_WRITER.using(ctx):
db_obj = ctx.session.query(extn_db.SubnetExtensionDb).filter(
extn_db.SubnetExtensionDb.subnet_id ==
subnet2['id']).one()
session.delete(db_obj)
ctx.session.delete(db_obj)
subnet2 = self._show('subnets', subnet2['id'])['subnet']
self.assertFalse(subnet2[SNAT_POOL])
@ -7531,10 +7599,12 @@ class TestExtensionAttributes(ApicAimTestCase):
# delete EPG subnet
self._delete('subnets', epg_subnet['id'])
self.assertFalse(extn.get_subnet_extn_db(session, epg_subnet['id']))
with db_api.CONTEXT_READER.using(ctx):
self.assertFalse(extn.get_subnet_extn_db(ctx.session,
epg_subnet['id']))
def test_router_lifecycle(self):
session = db_api.get_reader_session()
ctx = n_context.get_admin_context()
extn = extn_db.ExtensionDbMixin()
# create router with default values
@ -7578,7 +7648,9 @@ class TestExtensionAttributes(ApicAimTestCase):
rtr2 = self._make_router(self.fmt, 'test-tenant', 'router2',
arg_list=self.extension_attributes,
**{PROV: ['k'], CONS: ['k']})['router']
extn.set_router_extn_db(session, rtr2['id'], {PROV: [], CONS: []})
with db_api.CONTEXT_WRITER.using(ctx):
extn.set_router_extn_db(ctx.session, rtr2['id'],
{PROV: [], CONS: []})
rtr2 = self._show('routers', rtr2['id'])['router']
self.assertEqual([], rtr2[PROV])
self.assertEqual([], rtr2[CONS])
@ -7615,12 +7687,14 @@ class TestExtensionAttributes(ApicAimTestCase):
# delete
self._delete('routers', rtr1['id'])
with db_api.CONTEXT_READER.using(ctx):
self.assertEqual({PROV: [], CONS: []},
extn.get_router_extn_db(session, rtr1['id']))
extn.get_router_extn_db(ctx.session, rtr1['id']))
def test_address_scope_lifecycle(self):
session = db_api.get_writer_session()
aim_ctx = aim_context.AimContext(db_session=session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(db_session=ctx.session)
# Create VRF.
self.aim_mgr.create(
@ -7730,8 +7804,10 @@ class TestExtensionAttributes(ApicAimTestCase):
self.assertIn('is not valid VRF DN', resp['NeutronError']['message'])
# Update APIC DN
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(
db_session=db_api.get_writer_session())
db_session=ctx.session)
self.aim_mgr.create(
aim_ctx, aim_resource.Tenant(name=self.t1_aname, monitored=True))
vrf = aim_resource.VRF(tenant_name=self.t1_aname, name='default',
@ -7881,8 +7957,10 @@ class TestExtensionAttributes(ApicAimTestCase):
result['NeutronError']['type'])
def test_erspan_aim_config(self, network_type='opflex'):
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(
db_session=db_api.get_writer_session())
db_session=ctx.session)
self._register_agent('host1', AGENT_CONF_OPFLEX)
self._register_agent('host2', AGENT_CONF_OPFLEX)
# Host 1: VPC host
@ -8335,8 +8413,9 @@ class TestExternalConnectivityBase(object):
def _do_test_router_interface(self, use_addr_scope=False,
single_tenant=False):
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
cv = self.mock_ns.connect_vrf
dv = self.mock_ns.disconnect_vrf
@ -8696,8 +8775,9 @@ class TestExternalConnectivityBase(object):
self._router_interface_action('add', router['id'], sub1['id'], None)
self.mock_ns.connect_vrf.assert_not_called()
session = db_api.get_reader_session()
aim_ctx = aim_context.AimContext(session)
ctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
aim_ctx = aim_context.AimContext(ctx.session)
tenant_aname = self.name_mapper.project(None, net['tenant_id'])
aname = self.name_mapper.network(None, net['id'])
aim_bd = aim_resource.BridgeDomain(tenant_name=tenant_aname,
@ -9170,11 +9250,13 @@ class TestExternalConnectivityBase(object):
cv = self.mock_ns.connect_vrf
dv = self.mock_ns.disconnect_vrf
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
# create pre-existing VRF
vrf = aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True)
with self.db_session.begin():
vrf = self.aim_mgr.create(aim_ctx, vrf)
vrf.monitored = False
@ -9260,6 +9342,7 @@ class TestExternalConnectivityBase(object):
{'host': 'h1', 'name': 'ph1', 'type': 'PhysDom'},
{'host': 'h2', 'name': 'ph2', 'type': 'PhysDom'},
{'host': 'h3', 'name': 'ph3', 'type': 'PhysDom'}]
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
# test setup
vmm_domains = self._create_domains_and_mappings(aim_ctx, mappings,
@ -9421,7 +9504,9 @@ class TestExternalNoNat(TestExternalConnectivityBase,
def fix_l3out_vrf(self, l3out_tenant_name, l3out_name, vrf_name):
l3out = aim_resource.L3Outside(tenant_name=l3out_tenant_name,
name=l3out_name)
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
with self.db_session.begin():
self.aim_mgr.update(aim_ctx, l3out, vrf_name=vrf_name)
def _validate(self):
@ -9669,6 +9754,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
kwargs['tenant_network_types'] = ['vlan']
super(TestPortVlanNetwork, self).setUp(**kwargs)
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.hlink1 = aim_infra.HostLink(
host_name='h1',
@ -9681,6 +9767,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
mock_notif = mock.Mock(side_effect=self.port_notif_verifier())
self.driver.notifier.port_update = mock_notif
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if external_net:
net1 = self._make_ext_network('net1',
@ -9753,6 +9840,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self._do_test_port_lifecycle(external_net=True)
def test_pre_existing_l3out_svi_bgp(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
myl3out = aim_resource.L3Outside(tenant_name=self.t1_aname, name='l1')
@ -9915,6 +10003,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self._check_no_dynamic_segment(net2['id'])
def _test_multiple_ports_on_host(self, is_svi=False, bgp_enabled=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if not is_svi:
@ -10090,6 +10179,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self._test_multiple_ports_on_host(is_svi=True, bgp_enabled=True)
def _test_multiple_networks_on_host(self, is_svi=False, bgp_enabled=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if not is_svi:
@ -10345,6 +10435,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self._test_multiple_networks_on_host(is_svi=True, bgp_enabled=True)
def _test_ports_with_2_hostlinks(self, is_svi=False, bgp_enabled=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hlink_1a = aim_infra.HostLink(
host_name='h1',
@ -10687,6 +10778,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self._test_ports_with_2_hostlinks(is_svi=True, bgp_enabled=True)
def _test_network_on_multiple_hosts(self, is_svi=False, bgp_enabled=False):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if not is_svi:
@ -10881,6 +10973,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
self.driver.apic_router_id_pool = '199.199.199.1'
self.driver.apic_router_id_subnet = netaddr.IPSet(
[self.driver.apic_router_id_pool])
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
net1 = self._make_network(self.fmt, 'net1', True,
arg_list=self.extension_attributes,
@ -10901,6 +10994,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
result['NeutronError']['type'])
def test_port_binding_missing_hostlink(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
net1 = self._make_network(self.fmt, 'net1', True)['network']
@ -10928,6 +11022,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
def _test_topology_rpc_no_ports(self, is_svi=False):
nctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
if is_svi:
@ -10976,6 +11071,7 @@ class TestPortVlanNetwork(ApicAimTestCase):
def _test_topology_rpc(self, is_svi=False):
nctx = n_context.get_admin_context()
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
nets = []
epgs = []
@ -11134,6 +11230,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
return [str(dom['name']) for dom in domains]
def test_mixed_ports_on_network(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self._register_agent('opflex-1', AGENT_CONF_OPFLEX)
@ -11167,6 +11264,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
self._validate()
def test_mixed_ports_on_network_with_default_domains(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx,
aim_resource.VMMDomain(type='OpenStack',
@ -11399,6 +11497,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
aim_sg_rule1.remote_group_id, sg_rule['remote_group_id'])
def test_mixed_ports_on_network_with_specific_domains(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hd_mapping = aim_infra.HostDomainMappingV2(host_name='opflex-1',
domain_name='vm1',
@ -11438,7 +11537,8 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
self._register_agent('opflex-3', AGENT_CONF_OPFLEX)
net1 = self._make_network(
self.fmt, 'net1', True,
arg_list=('provider:physical_network', 'provider:network_type'),
arg_list=('provider:physical_network',
'provider:network_type'),
**{'provider:physical_network': 'physnet3',
'provider:network_type': 'opflex'})['network']
epg1 = self._net_2_epg(net1)
@ -11464,7 +11564,8 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
with_type=False)))
# create another port on a host that belongs to the same domain
with self.port(subnet=sub1) as p1a:
p1a = self._bind_port_to_host(p1a['port']['id'], 'opflex-2a')
p1a = self._bind_port_to_host(p1a['port']['id'],
'opflex-2a')
epg1 = self.aim_mgr.get(aim_ctx, epg1)
self.assertEqual(set([('vm2', 'OpenStack')]),
set(self._doms(epg1.vmm_domains)))
@ -11487,7 +11588,8 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
self.assertEqual(set([]),
set(self._doms(epg1.physical_domains,
with_type=False)))
# create another port on a host that belongs to the wildcard domain
# create another port on a host that belongs to the
# wildcard domain
with self.port(subnet=sub1) as p3:
p3 = self._bind_port_to_host(p3['port']['id'], 'opflex-3')
epg1 = self.aim_mgr.get(aim_ctx, epg1)
@ -11649,6 +11751,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
self.assertIn(static_path_2, epg1.static_paths)
def test_no_host_domain_mappings(self):
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
self.aim_mgr.create(aim_ctx,
aim_resource.VMMDomain(type='OpenStack',
@ -11662,7 +11765,8 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
self._register_agent('opflex-2', AGENT_CONF_OPFLEX)
net1 = self._make_network(
self.fmt, 'net1', True,
arg_list=('provider:physical_network', 'provider:network_type'),
arg_list=('provider:physical_network',
'provider:network_type'),
**{'provider:physical_network': 'physnet3',
'provider:network_type': 'opflex'})['network']
epg1 = self._net_2_epg(net1)
@ -11712,6 +11816,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
def test_no_nat_external_net_default_domains(self):
# no-NAT external networks rely on port-binding
# to dynamically associate domains to the external EPG
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
domains = [{'type': 'OpenStack', 'name': 'cloud-rtp-1-VMM'},
{'type': 'OpenStack', 'name': 'vm1'}]
@ -11782,6 +11887,7 @@ class TestPortOnPhysicalNode(TestPortVlanNetwork):
def test_no_nat_external_net_specific_domains(self):
# no-NAT external networks rely on port-binding
# to dynamically associate domains to the external EPG
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
domains = [{'type': 'OpenStack', 'name': 'cloud-rtp-1-VMM'},
{'type': 'OpenStack', 'name': 'vm1'}]
@ -12100,6 +12206,7 @@ class TestPortOnPhysicalNodeSingleDriver(TestPortOnPhysicalNode):
expected_binding_info = None
physical_domain = 'phys1'
physical_network = 'physnet3'
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hd_mapping = aim_infra.HostDomainMappingV2(host_name='*',
domain_name=physical_domain,
@ -12382,6 +12489,8 @@ class TestOpflexRpc(ApicAimTestCase):
port = self._make_port(self.fmt, net_id, fixed_ips=fixed_ips)['port']
port_id = port['id']
with db_api.CONTEXT_WRITER.using(self.db_session):
with self.db_session.begin():
self.driver._set_vm_name(self.db_session, 'someid', 'a name')
port = self._bind_port_to_host(port_id, host)['port']
self.assertEqual('ovs', port['binding:vif_type'])
@ -12420,6 +12529,7 @@ class TestOpflexRpc(ApicAimTestCase):
def _test_endpoint_details_bound_vlan_svi(self, apic_svi=False):
self._register_agent('h1', AGENT_CONF_OPFLEX)
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hlink_1 = aim_infra.HostLink(
host_name='h1',
@ -12466,6 +12576,7 @@ class TestOpflexRpc(ApicAimTestCase):
def _test_endpoint_details_bound_trunk_no_subport(self, apic_svi=False):
self._register_agent('h1', AGENT_CONF_OPFLEX)
with db_api.CONTEXT_READER.using(self.db_session):
aim_ctx = aim_context.AimContext(self.db_session)
hlink_1 = aim_infra.HostLink(
host_name='h1',
@ -12536,6 +12647,7 @@ class TestOpflexRpc(ApicAimTestCase):
self.assertEqual('ovs', port['binding:vif_type'])
# Unbind the port, as if binding failed, leaving it bindable.
with db_api.CONTEXT_READER.using(self.db_session):
self.db_session.query(ml2_models.PortBinding).filter_by(
port_id=port['id']).update(
{'vif_type': portbindings.VIF_TYPE_BINDING_FAILED})

View File

@ -164,7 +164,9 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
ml2_options=ml2_opts, l3_plugin=l3_plugin,
sc_plugin=sc_plugin, qos_plugin=qos_plugin,
trunk_plugin=trunk_plugin)
self.db_session = db_api.get_writer_session()
ctx = nctx.get_admin_context()
with db_api.CONTEXT_WRITER.using(ctx):
self.db_session = ctx.session
self.initialize_db_config(self.db_session)
self.l3_plugin = directory.get_plugin(pconst.L3)
config.cfg.CONF.set_override('network_vlan_ranges',
@ -840,7 +842,7 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
def _get_nsp_ptg_fip_mapping(self, ptg_id):
ctx = nctx.get_admin_context()
with ctx.session.begin(subtransactions=True):
with db_api.CONTEXT_WRITER.using(ctx):
return (ctx.session.query(
nsp_manager.ServicePolicyPTGFipMapping).
filter_by(policy_target_group_id=ptg_id).
@ -1610,15 +1612,15 @@ class TestLegacyL3Policy(TestL3Policy):
aimd.AIMMappingDriver._create_per_l3p_implicit_contracts)
def _create_per_l3p_implicit_contracts(self):
session = nctx.get_admin_context().session
with session.begin(subtransactions=True):
ctx = nctx.get_admin_context()
with db_api.CONTEXT_WRITER.using(ctx):
l3p_db = group_policy_mapping_db.L3PolicyMapping(
id='1234', tenant_id='some_tenant',
name='test-l3p', description='test-desc',
ip_version=4, ip_pool='10.0.0.0/8',
subnet_prefix_length=24, shared=False)
session.add(l3p_db)
session.flush()
ctx.session.add(l3p_db)
ctx.session.flush()
orig_create_per_l3p_implicit_contracts(self)
aimd.AIMMappingDriver._create_per_l3p_implicit_contracts = (
orig_create_per_l3p_implicit_contracts)
@ -2389,6 +2391,7 @@ class TestPolicyTargetGroupIpv4(AIMBaseTestCase):
num_address_families=len(list(self.ip_dict.keys())))
ptg_name = ptg['name']
with self.db_session.begin():
aim_epg_name = self.driver.apic_epg_name_for_policy_target_group(
self._neutron_context.session, ptg_id, ptg_name,
context=self._neutron_context)
@ -2406,7 +2409,8 @@ class TestPolicyTargetGroupIpv4(AIMBaseTestCase):
self._aim_context, aim_resource.BridgeDomain.from_dn(
net[DN][BD]))
aim_epgs = self.aim_mgr.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self._aim_context, aim_resource.EndpointGroup,
name=aim_epg_name)
self.assertEqual(1, len(aim_epgs))
self.assertEqual(aim_epg_name, aim_epgs[0].name)
self.assertEqual(aim_tenant_name, aim_epgs[0].tenant_name)
@ -2429,7 +2433,8 @@ class TestPolicyTargetGroupIpv4(AIMBaseTestCase):
self._neutron_context.session, ptg_id, new_name,
context=self._neutron_context)
aim_epgs = self.aim_mgr.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self._aim_context, aim_resource.EndpointGroup,
name=aim_epg_name)
self.assertEqual(1, len(aim_epgs))
self.assertEqual(aim_epg_name, aim_epgs[0].name)
self._validate_contracts(ptg, aim_epgs[0], new_prs_lists, l2p)
@ -2445,8 +2450,11 @@ class TestPolicyTargetGroupIpv4(AIMBaseTestCase):
# Explicitly created L2P should not be deleted
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=200)
# TODO(pulkit): replace with AIM reader context once API supports it.
with self.db_session.begin():
aim_epgs = self.aim_mgr.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self._aim_context, aim_resource.EndpointGroup,
name=aim_epg_name)
self.assertEqual(0, len(aim_epgs))
def _create_explicit_subnetpools(self):
@ -2949,6 +2957,8 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
def test_get_gbp_details_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
@ -3624,6 +3634,8 @@ class TestPolicyTarget(AIMBaseTestCase,
def test_get_gbp_details_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
@ -3636,6 +3648,8 @@ class TestPolicyTarget(AIMBaseTestCase,
def test_get_gbp_details_no_pt_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
@ -5524,6 +5538,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
p1 = self._make_port(self.fmt, net['network']['id'],
device_owner='compute:')['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
with self.db_session.begin():
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='host1')

View File

@ -101,14 +101,22 @@ class AimValidationTestMixin(object):
resource = copy.copy(resource)
# Make sure the AIM resource exists.
# TODO(pulkit): replace with AIM reader context once API supports it.
with self.db_session.begin():
actual_resource = self.aim_mgr.get(self.aim_ctx, resource)
self.assertIsNotNone(actual_resource)
# Only test deleting and modifying if not monitored.
if not actual_resource.monitored:
# Delete the AIM resource and test.
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, resource)
self._validate_repair_validate()
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with self.db_session.begin():
self.assertTrue(
actual_resource.user_equal(
self.aim_mgr.get(self.aim_ctx, resource)))
@ -117,22 +125,33 @@ class AimValidationTestMixin(object):
self.aim_mgr.update(
self.aim_ctx, resource, display_name='not what it was')
self._validate_repair_validate()
# TODO(pulkit): replace with AIM reader context once API
# supports it.
with self.db_session.begin():
self.assertTrue(
actual_resource.user_equal(
self.aim_mgr.get(self.aim_ctx, resource)))
# Add unexpected AIM resource and test.
setattr(resource, unexpected_attr_name, unexpected_attr_value)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, resource)
self._validate_repair_validate()
if test_unexpected_monitored:
# Add unexpected monitored AIM resource and test.
resource.monitored = True
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, resource)
self._validate()
# Delete unexpected monitored AIM resource.
# TODO(pulkit): replace with AIM writer context once API
# supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, resource)
@ -175,6 +194,8 @@ class TestNeutronMapping(AimValidationTestCase):
# Delete the common Tenant and test.
tenant = aim_resource.Tenant(name='common')
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, tenant)
self._validate_repair_validate()
@ -316,6 +337,7 @@ class TestNeutronMapping(AimValidationTestCase):
self._validate()
# Delete the address scope's mapping record and test.
with self.db_session.begin():
(self.db_session.query(db.AddressScopeMapping).
filter_by(scope_id=scope4_id).
delete())
@ -336,6 +358,7 @@ class TestNeutronMapping(AimValidationTestCase):
self._test_aim_resource(vrf)
# Delete the initial address scope's mapping record and test.
with self.db_session.begin():
(self.db_session.query(db.AddressScopeMapping).
filter_by(scope_id=scope4_id).
delete())
@ -352,6 +375,7 @@ class TestNeutronMapping(AimValidationTestCase):
# test. Without this record, there is no way to know that the
# scopes were previously isomorphic, so they no longer will
# be isomorphic after repair.
with self.db_session.begin():
(self.db_session.query(db.AddressScopeMapping).
filter_by(scope_id=scope6_id).
delete())
@ -385,6 +409,7 @@ class TestNeutronMapping(AimValidationTestCase):
net = self._show('networks', net['id'])['network']
# Delete the network's mapping record and test.
with self.db_session.begin():
(self.db_session.query(db.NetworkMapping).
filter_by(network_id=net_id).
delete())
@ -431,6 +456,7 @@ class TestNeutronMapping(AimValidationTestCase):
# Add unexpect AIM Subnet if not external.
sn = self.driver.aim_mech_driver._map_subnet(
subnet, '10.0.2.1', bd)
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, sn)
self._validate_repair_validate()
else:
@ -447,6 +473,7 @@ class TestNeutronMapping(AimValidationTestCase):
self._test_aim_resource(sn, 'gw_ip_mask', '10.0.3.1/24')
# Delete subnet extension data and test migration use case.
with self.db_session.begin():
(self.db_session.query(ext_db.SubnetExtensionDb).
filter_by(subnet_id=subnet_id).
delete())
@ -462,6 +489,8 @@ class TestNeutronMapping(AimValidationTestCase):
bd = aim_resource.BridgeDomain(tenant_name=tenant_name,
name='some_bd_name')
bd.monitored = True
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
bd = self.aim_mgr.create(self.aim_ctx, bd)
self._test_unrouted_network(preexisting_bd=bd)
@ -493,6 +522,7 @@ class TestNeutronMapping(AimValidationTestCase):
# REVISIT: We should consider supporting configuration file
# mappings of pre-existing BDs.
if not preexisting_bd:
with self.db_session.begin():
(self.db_session.query(ext_db.NetworkExtensionDb).
filter_by(network_id=net_id).
delete())
@ -503,6 +533,8 @@ class TestNeutronMapping(AimValidationTestCase):
# Create AIM HostDomainMappingV2.
hd_mapping = aim_infra.HostDomainMappingV2(
host_name='*', domain_name='vm2', domain_type='OpenStack')
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, hd_mapping)
# Create external network.
@ -577,6 +609,8 @@ class TestNeutronMapping(AimValidationTestCase):
def test_preexisting_external_network(self):
# Create pre-existing AIM VRF.
vrf = aim_resource.VRF(tenant_name='common', name='v1', monitored=True)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, vrf)
# Create pre-existing AIM L3Outside.
@ -586,12 +620,14 @@ class TestNeutronMapping(AimValidationTestCase):
# Create pre-existing AIM ExternalNetwork.
ext_net = aim_resource.ExternalNetwork(
tenant_name='common', l3out_name='l1', name='n1', monitored=True)
tenant_name='common', l3out_name='l1', name='n1',
monitored=True)
self.aim_mgr.create(self.aim_ctx, ext_net)
# Create pre-existing AIM ExternalSubnet.
ext_sn = aim_resource.ExternalSubnet(
tenant_name='common', l3out_name='l1', external_network_name='n1',
tenant_name='common', l3out_name='l1',
external_network_name='n1',
cidr='0.0.0.0/0', monitored=True)
self.aim_mgr.create(self.aim_ctx, ext_sn)
@ -601,6 +637,7 @@ class TestNeutronMapping(AimValidationTestCase):
# Delete network extension data and clear ExternalNetwork
# contracts to test migration use case.
with self.db_session.begin():
(self.db_session.query(ext_db.NetworkExtensionDb).
filter_by(network_id=net_id).
delete())
@ -634,10 +671,14 @@ class TestNeutronMapping(AimValidationTestCase):
self._test_network_attrs(net)
# Delete pre-existing AIM VRF and test.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, vrf)
self._validate_unrepairable()
# Replace pre-existing AIM VRF and test.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, vrf)
self._validate()
@ -658,15 +699,21 @@ class TestNeutronMapping(AimValidationTestCase):
# that might break existing use cases.
# Delete pre-existing AIM L3Outside and test.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, l3out)
self._validate_repair_validate()
# Delete pre-existing AIM ExternalNetwork, along with its
# child ExternalSubnet, and test.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, ext_net, cascade=True)
self._validate_repair_validate()
# Delete just the pre-existing AIM ExternalSubnet and test.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, ext_sn)
self._validate_repair_validate()
@ -782,6 +829,7 @@ class TestNeutronMapping(AimValidationTestCase):
self._test_aim_resource(esn, 'cidr', '1.2.3.4/0')
# Delete the CloneL3Out record and test.
with self.db_session.begin():
(self.db_session.query(aim_lib_model.CloneL3Out).
filter_by(tenant_name=tenant_name, name=l3out_name).
delete())
@ -1030,16 +1078,19 @@ class TestNeutronMapping(AimValidationTestCase):
# Change port binding level to unknown mechanism driver and
# test.
with self.db_session.begin():
self.db_session.query(ml2_models.PortBindingLevel).filter_by(
port_id=port['id'], level=0).update({'driver': 'xxx'})
self._validate_repair_validate()
# Change port binding level to incorrect host and test.
with self.db_session.begin():
self.db_session.query(ml2_models.PortBindingLevel).filter_by(
port_id=port['id'], level=0).update({'host': 'yyy'})
self._validate_repair_validate()
# Change port binding level to null segment ID and test.
with self.db_session.begin():
self.db_session.query(ml2_models.PortBindingLevel).filter_by(
port_id=port['id'], level=0).update({'segment_id': None})
self._validate_repair_validate()
@ -1051,6 +1102,7 @@ class TestNeutronMapping(AimValidationTestCase):
# dynamic segment whenever there is no agent on the port's
# host, which is probably wrong, but it does fail to bind, so
# this test succeeds.
with self.db_session.begin():
self.db_session.query(ml2_models.PortBindingLevel).filter_by(
port_id=port['id'], level=0).update({'driver': 'xxx',
'host': 'yyy'})
@ -1076,6 +1128,8 @@ class TestNeutronMapping(AimValidationTestCase):
host1_dn = 'topology/pod-1/protpaths-101-102/pathep-[%s]' % host1_pg
self.hlink1 = aim_infra.HostLink(
host_name='host1', interface_name='eth0', path=host1_dn)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.create(self.aim_ctx, self.hlink1)
acc_bundle = aim_resource.InfraAccBundleGroup(name=host1_pg,
monitored=True)
@ -1092,6 +1146,8 @@ class TestNeutronMapping(AimValidationTestCase):
# Delete source group from AIM, and verify that it
# can be repaired.
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
source_groups = self.aim_mgr.find(self.aim_ctx,
aim_resource.SpanVsourceGroup)
self.aim_mgr.delete(self.aim_ctx, source_groups[0])
@ -1117,6 +1173,7 @@ class TestNeutronMapping(AimValidationTestCase):
epg_dn = net['apic:distinguished_names']['EndpointGroup']
# Delete the network's mapping record and test.
with self.db_session.begin():
(self.db_session.query(db.NetworkMapping).
filter_by(network_id=net_id).
delete())
@ -1136,6 +1193,8 @@ class TestNeutronMapping(AimValidationTestCase):
# setting scope to security group but
# should validate common tenant resources
tenant = aim_resource.Tenant(name='common')
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, tenant)
self._validate_repair_validate_scoped(["security_group"], None)
@ -1146,10 +1205,14 @@ class TestNeutronMapping(AimValidationTestCase):
epg_dn1 = net1['apic:distinguished_names']['EndpointGroup']
bd1 = aim_resource.BridgeDomain.from_dn(bd_dn1)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, bd1)
# delete EndpointGroup.
epg1 = aim_resource.EndpointGroup.from_dn(epg_dn1)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, epg1)
net_resp2 = self._make_network(
@ -1159,10 +1222,14 @@ class TestNeutronMapping(AimValidationTestCase):
epg_dn2 = net2['apic:distinguished_names']['EndpointGroup']
bd2 = aim_resource.BridgeDomain.from_dn(bd_dn2)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, bd2)
# delete EndpointGroup.
epg2 = aim_resource.EndpointGroup.from_dn(epg_dn2)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, epg2)
self._validate_repair_validate_scoped(None, ['prj_ten_1'])
self._validate_repair_validate_scoped(None, ['prj_ten_2'])
@ -1184,6 +1251,8 @@ class TestNeutronMapping(AimValidationTestCase):
aim_sg = aim_resource.SecurityGroup(
name=sg_name, tenant_name=tenant_name)
self._test_aim_resource(aim_sg)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, aim_sg)
# Test the AIM SecurityGroupSubject.
@ -1191,6 +1260,8 @@ class TestNeutronMapping(AimValidationTestCase):
name='default', security_group_name=sg_name,
tenant_name=tenant_name)
self._test_aim_resource(aim_subject)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, aim_subject)
# Test the AIM SecurityGroupRule.
@ -1200,10 +1271,14 @@ class TestNeutronMapping(AimValidationTestCase):
security_group_name=sg_name,
tenant_name=tenant_name)
self._test_aim_resource(aim_rule)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, aim_rule)
aim_tenant = aim_resource.Tenant(name=tenant_name)
self._test_aim_resource(aim_tenant)
# TODO(pulkit): replace with AIM writer context once API supports it.
with self.db_session.begin():
self.aim_mgr.delete(self.aim_ctx, aim_tenant)
self._validate_repair_validate_scoped(None, [tenant_name])

View File

@ -1,5 +1,5 @@
[tox]
envlist = py36,py37,pep8
envlist = py36,py37,pep8,py38
minversion = 3.2.0
skipsdist = True
ignore_basepython_conflict = True