diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py index c557c3edd..714b9280d 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/extension_db.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from collections import defaultdict + from neutron.db import models_v2 from neutron_lib.db import model_base import sqlalchemy as sa @@ -325,6 +327,24 @@ class ExtensionDbMixin(object): [c['contract_name'] for c in db_contracts if not c['provides']]} + def get_router_extn_db_bulk(self, session, router_ids): + # Baked queries using in_ require sqlalchemy >=1.2. + db_contracts = (session.query(RouterExtensionContractDb).filter( + RouterExtensionContractDb.router_id.in_(router_ids)).all()) + + attr_dict = defaultdict(dict) + for db_contract in db_contracts: + router_id = db_contract['router_id'] + p_contracts = attr_dict[router_id].setdefault( + cisco_apic_l3.EXTERNAL_PROVIDED_CONTRACTS, []) + c_contracts = attr_dict[router_id].setdefault( + cisco_apic_l3.EXTERNAL_CONSUMED_CONTRACTS, []) + if db_contract['provides']: + p_contracts.append(db_contract['contract_name']) + else: + c_contracts.append(db_contract['contract_name']) + return attr_dict + def _update_list_attr(self, session, db_model, column, new_values, **filters): if new_values is None: diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py index fa7fc8e73..e13fe673a 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py @@ -1586,25 +1586,79 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.aim.delete(aim_ctx, subject) self.aim.delete(aim_ctx, contract) - def extend_router_dict(self, session, router_db, result): - if result.get(api_plus.BULK_EXTENDED): - return - LOG.debug("APIC AIM MD extending dict for router: %s", result) + def extend_router_dict_bulk(self, session, results): + LOG.debug("APIC AIM MD extending dict bulk for router: %s", + results) - sync_state = cisco_apic.SYNC_SYNCED - dist_names = {} + # Gather db objects aim_ctx = aim_context.AimContext(session) + aim_resources_aggregate = [] + res_dict_by_aim_res_dn = {} + # template to track the status related info + # for each resource. + aim_status_track_template = { + SYNC_STATE_TMP: cisco_apic.SYNC_NOT_APPLICABLE, + AIM_RESOURCES_CNT: 0} - contract, subject = self._map_router(session, router_db) + for res_dict in results: + aim_resources = [] + res_dict[cisco_apic.SYNC_STATE] = cisco_apic.SYNC_NOT_APPLICABLE + # Use a tmp field to aggregate the status across mapped + # AIM objects, we set the actual sync_state only if we + # are able to process all the status objects for these + # corresponding AIM resources. If any status object is not + # available then sync_state will be 'build'. On create, + # subnets start in 'N/A'. The tracking object is added + # along with the res_dict on the DN based res_dict_by_aim_res_dn + # dict which maintains the mapping from status objs to res_dict. + aim_status_track = copy.deepcopy(aim_status_track_template) - dist_names[a_l3.CONTRACT] = contract.dn - sync_state = self._merge_status(aim_ctx, sync_state, contract) + res_dict[cisco_apic.DIST_NAMES] = {} + res_dict_and_aim_status_track = (res_dict, aim_status_track) + dist_names = res_dict[cisco_apic.DIST_NAMES] - dist_names[a_l3.CONTRACT_SUBJECT] = subject.dn - sync_state = self._merge_status(aim_ctx, sync_state, subject) + contract, subject = self._map_router(session, res_dict) + dist_names[a_l3.CONTRACT] = contract.dn + aim_resources.append(contract) + res_dict_by_aim_res_dn[contract.dn] = res_dict_and_aim_status_track + dist_names[a_l3.CONTRACT_SUBJECT] = subject.dn + aim_resources.append(subject) + res_dict_by_aim_res_dn[subject.dn] = res_dict_and_aim_status_track - result[cisco_apic.DIST_NAMES] = dist_names - result[cisco_apic.SYNC_STATE] = sync_state + # Track the number of AIM resources in aim_status_track, + # decrement count each time we process a status obj related to + # the resource. If the count hits zero then we have processed + # the status objs for all of the associated AIM resources. Until + # this happens, the sync_state is held as 'build' (unless it has + # to be set to 'error'). + aim_status_track[AIM_RESOURCES_CNT] = len(aim_resources) + aim_resources_aggregate.extend(aim_resources) + + # Merge statuses + for status in self.aim.get_statuses(aim_ctx, aim_resources_aggregate): + res_dict, aim_status_track = res_dict_by_aim_res_dn.get( + status.resource_dn, ({}, {})) + if res_dict and aim_status_track: + aim_status_track[SYNC_STATE_TMP] = self._merge_status( + aim_ctx, + aim_status_track.get(SYNC_STATE_TMP, + cisco_apic.SYNC_NOT_APPLICABLE), + None, status=status) + aim_status_track[AIM_RESOURCES_CNT] -= 1 + if (aim_status_track[AIM_RESOURCES_CNT] == 0 or + (aim_status_track[SYNC_STATE_TMP] is + cisco_apic.SYNC_ERROR)): + # if this is zero then all the AIM resources corresponding, + # to this neutron resource are processed and we can + # accurately reflect the actual sync_state. Anytime we + # encounter an error - we reflect that immediately even + # if we are not done with the AIM resources processing. + res_dict[cisco_apic.SYNC_STATE] = ( + aim_status_track[SYNC_STATE_TMP]) + + def extend_router_dict(self, session, router_db, result): + LOG.debug("APIC AIM MD extending dict for router: %s", result) + self.extend_router_dict_bulk(session, [result]) def add_router_interface(self, context, router, port, subnets): LOG.debug("APIC AIM MD adding subnets %(subnets)s to router " diff --git a/gbpservice/neutron/services/apic_aim/l3_plugin.py b/gbpservice/neutron/services/apic_aim/l3_plugin.py index 37db62569..2c2524864 100644 --- a/gbpservice/neutron/services/apic_aim/l3_plugin.py +++ b/gbpservice/neutron/services/apic_aim/l3_plugin.py @@ -14,6 +14,7 @@ # under the License. from neutron.api import extensions +from neutron.db import api as db_api from neutron.db import common_db_mixin from neutron.db import db_base_plugin_v2 from neutron.db import dns_db @@ -34,10 +35,13 @@ from gbpservice._i18n import _LE from gbpservice._i18n import _LI from gbpservice.neutron import extensions as extensions_pkg from gbpservice.neutron.extensions import cisco_apic_l3 as l3_ext +from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import ( extension_db as extn_db) from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import ( mechanism_driver as md) +from gbpservice.neutron.plugins.ml2plus import patch_neutron # noqa + LOG = logging.getLogger(__name__) @@ -78,6 +82,9 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin, return self._mechanism_driver def _extend_router_dict_apic(self, router_res, router_db): + if router_res.get(api_plus.BULK_EXTENDED): + return + LOG.debug("APIC AIM L3 Plugin extending router dict: %s", router_res) session = inspect(router_db).session try: @@ -90,6 +97,56 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin, db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs( l3.ROUTERS, ['_extend_router_dict_apic']) + def _extend_router_dict_bulk_apic(self, routers, _): + LOG.debug("APIC AIM L3 Plugin bulk extending router dict: %s", + routers) + if not routers: + return + session = patch_neutron.get_current_session() + try: + self._md.extend_router_dict_bulk(session, routers) + self._include_router_extn_attr_bulk(session, routers) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception("APIC AIM _extend_router_dict_bulk failed") + + db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs( + l3.ROUTERS + '_BULK', ['_extend_router_dict_bulk_apic']) + + def _make_router_dict_nop(self, router, fields=None, + process_extensions=False): + return router + + def _make_routers_dict(self, routers, fields=None): + results = [] + for router in routers: + res = self._make_router_dict(router, fields, + process_extensions=False) + res[api_plus.BULK_EXTENDED] = True + self._apply_dict_extend_functions(l3.ROUTERS, res, router) + res.pop(api_plus.BULK_EXTENDED, None) + results.append(res) + + self._apply_dict_extend_functions(l3.ROUTERS + '_BULK', + results, None) + return results + + # Overwrite the upstream implementation to take advantage + # of the bulk extension support. + @db_api.retry_if_session_inactive() + def get_routers(self, context, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + marker_obj = self._get_marker_obj(context, 'router', limit, marker) + routers_db = self._get_collection(context, l3_db.Router, + self._make_router_dict_nop, + filters=filters, fields=fields, + sorts=sorts, + limit=limit, + marker_obj=marker_obj, + page_reverse=page_reverse) + return self._make_routers_dict(routers_db, fields) + def create_router(self, context, router): LOG.debug("APIC AIM L3 Plugin creating router: %s", router) self._md.ensure_tenant(context, router['router']['tenant_id']) @@ -146,6 +203,14 @@ class ApicL3Plugin(common_db_mixin.CommonDbMixin, attr = self.get_router_extn_db(session, router['id']) router.update(attr) + def _include_router_extn_attr_bulk(self, session, routers): + router_ids = [router['id'] for router in routers] + attr_dict = self.get_router_extn_db_bulk(session, router_ids) + for router in routers: + router.update(attr_dict[router['id']] if router['id'] in attr_dict + else {l3_ext.EXTERNAL_PROVIDED_CONTRACTS: [], + l3_ext.EXTERNAL_CONSUMED_CONTRACTS: []}) + def add_router_interface(self, context, router_id, interface_info): LOG.debug("APIC AIM L3 Plugin adding interface %(interface)s " "to router %(router)s", diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py index 26f3264f7..8a70f899e 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py @@ -451,6 +451,49 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase, self.assertIsNotNone(sg_rule) return sg_rule + def _get_contract(self, contract_name, tenant_name): + session = db_api.get_reader_session() + aim_ctx = aim_context.AimContext(session) + contract = aim_resource.Contract(tenant_name=tenant_name, + name=contract_name) + contract = self.aim_mgr.get(aim_ctx, contract) + self.assertIsNotNone(contract) + return contract + + def _get_subject(self, subject_name, contract_name, tenant_name): + session = db_api.get_reader_session() + aim_ctx = aim_context.AimContext(session) + subject = aim_resource.ContractSubject(tenant_name=tenant_name, + contract_name=contract_name, + name=subject_name) + subject = self.aim_mgr.get(aim_ctx, subject) + self.assertIsNotNone(subject) + return subject + + def _check_router(self, router): + dns = copy.copy(router.get(DN)) + aname = self.name_mapper.router(None, router['id']) + + aim_contract = self._get_contract(aname, 'common') + self.assertEqual('common', aim_contract.tenant_name) + self.assertEqual(aname, aim_contract.name) + self.assertEqual(router['name'], aim_contract.display_name) + self.assertEqual('context', aim_contract.scope) # REVISIT(rkukura) + self._check_dn_is_resource(dns, 'Contract', aim_contract) + + aim_subject = self._get_subject('route', aname, 'common') + self.assertEqual('common', aim_subject.tenant_name) + self.assertEqual(aname, aim_subject.contract_name) + self.assertEqual('route', aim_subject.name) + self.assertEqual(router['name'], aim_subject.display_name) + self.assertEqual([], aim_subject.in_filters) + self.assertEqual([], aim_subject.out_filters) + self.assertEqual([self.driver.apic_system_id + '_AnyFilter'], + aim_subject.bi_filters) + self._check_dn_is_resource(dns, 'ContractSubject', aim_subject) + + self.assertFalse(dns) + def _sg_rule_should_not_exist(self, sg_rule_name): session = db_api.get_session() aim_ctx = aim_context.AimContext(session) @@ -723,15 +766,6 @@ class TestAimMapping(ApicAimTestCase): name=epg_name) self.assertEqual([], epgs) - def _get_contract(self, contract_name, tenant_name): - session = db_api.get_session() - aim_ctx = aim_context.AimContext(session) - contract = aim_resource.Contract(tenant_name=tenant_name, - name=contract_name) - contract = self.aim_mgr.get(aim_ctx, contract) - self.assertIsNotNone(contract) - return contract - def _contract_should_not_exist(self, contract_name): session = db_api.get_session() aim_ctx = aim_context.AimContext(session) @@ -739,16 +773,6 @@ class TestAimMapping(ApicAimTestCase): name=contract_name) self.assertEqual([], contracts) - def _get_subject(self, subject_name, contract_name, tenant_name): - session = db_api.get_session() - aim_ctx = aim_context.AimContext(session) - subject = aim_resource.ContractSubject(tenant_name=tenant_name, - contract_name=contract_name, - name=subject_name) - subject = self.aim_mgr.get(aim_ctx, subject) - self.assertIsNotNone(subject) - return subject - def _subject_should_not_exist(self, subject_name, contract_name): session = db_api.get_session() aim_ctx = aim_context.AimContext(session) @@ -1022,30 +1046,6 @@ class TestAimMapping(ApicAimTestCase): else: self.assertEqual(aim_sg_rule.icmp_type, 'unspecified') - def _check_router(self, router): - dns = copy.copy(router.get(DN)) - aname = self.name_mapper.router(None, router['id']) - - aim_contract = self._get_contract(aname, 'common') - self.assertEqual('common', aim_contract.tenant_name) - self.assertEqual(aname, aim_contract.name) - self.assertEqual(router['name'], aim_contract.display_name) - self.assertEqual('context', aim_contract.scope) # REVISIT(rkukura) - self._check_dn_is_resource(dns, 'Contract', aim_contract) - - aim_subject = self._get_subject('route', aname, 'common') - self.assertEqual('common', aim_subject.tenant_name) - self.assertEqual(aname, aim_subject.contract_name) - self.assertEqual('route', aim_subject.name) - self.assertEqual(router['name'], aim_subject.display_name) - self.assertEqual([], aim_subject.in_filters) - self.assertEqual([], aim_subject.out_filters) - self.assertEqual([self.driver.apic_system_id + '_AnyFilter'], - aim_subject.bi_filters) - self._check_dn_is_resource(dns, 'ContractSubject', aim_subject) - - self.assertFalse(dns) - def _check_router_deleted(self, router): aname = self.name_mapper.router(None, router['id']) self._subject_should_not_exist('route', aname) @@ -5188,11 +5188,6 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV])) self.assertEqual(['k'], rtr1[CONS]) - # delete - self._delete('routers', rtr1['id']) - self.assertEqual({PROV: [], CONS: []}, - extn.get_router_extn_db(session, rtr1['id'])) - # Simulate a prior existing router (i.e. no extension attrs exist) rtr2 = self._make_router(self.fmt, 'test-tenant', 'router2', arg_list=self.extension_attributes, @@ -5217,6 +5212,26 @@ class TestExtensionAttributes(ApicAimTestCase): self.assertEqual(['p1', 'p2'], sorted(rtr2[PROV])) self.assertEqual([], rtr2[CONS]) + # Test the full list which will invoke the bulk extension + rtrs = self._list('routers')['routers'] + self.assertEqual(3, len(rtrs)) + for rtr in rtrs: + self._check_router(rtr) + if rtr['id'] == rtr0['id']: + self.assertEqual([], sorted(rtr[PROV])) + self.assertEqual([], rtr[CONS]) + elif rtr['id'] == rtr1['id']: + self.assertEqual(['p1', 'p2'], sorted(rtr1[PROV])) + self.assertEqual(['k'], rtr1[CONS]) + elif rtr['id'] == rtr2['id']: + self.assertEqual(['p1', 'p2'], sorted(rtr[PROV])) + self.assertEqual([], rtr[CONS]) + + # delete + self._delete('routers', rtr1['id']) + self.assertEqual({PROV: [], CONS: []}, + extn.get_router_extn_db(session, rtr1['id'])) + def test_address_scope_lifecycle(self): session = db_api.get_session() aim_ctx = aim_context.AimContext(db_session=session)