[AIM] Dispatch Opflex RPCs directly to the mechanism driver

The ML2Plus plugin now overrides ML2's implementation of
start_rpc_listeners() so that it also calls similarly named functions
on registered mechanism drivers. The apic_aim mechanism driver's
start_rpc_listeners() method starts the topology RPC listener (which
was previously started even if not in an RPC worker), and, if enabled,
an RPC listener for the new Opflex RPC implementation. The aim_mapping
policy driver no longer starts its Opflex RPC listener if the new RPC
implementation is enabled. RPC initialization code is consolidated
into the mechanism driver's rpc module.

Change-Id: I685a55248cb17b5e2351805beae6cbd7ab2e8830
This commit is contained in:
Robert Kukura 2019-02-20 19:45:39 -05:00
parent 6d09dbfddd
commit 44e1913304
9 changed files with 149 additions and 30 deletions

View File

@ -152,6 +152,15 @@ class SecurityGroupRuleContext(object):
@six.add_metaclass(abc.ABCMeta)
class MechanismDriver(driver_api.MechanismDriver):
def start_rpc_listeners(self):
"""Start driver-specify RPC listeners.
If any driver-specific RPC listeners are needed, create an RPC
connection, create the consumers, call consume_in_threads() on
the connection, and return the resulting list of servers.
"""
return []
# REVISIT(rkukura): Is this needed for all operations, or just for
# create operations? If its needed for all operations, should the
# method be specific to the resource and operation, and include

View File

@ -63,7 +63,6 @@ from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.utils import net
from opflexagent import constants as ofcst
from opflexagent import host_agent_rpc as arpc
from opflexagent import rpc as ofrpc
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -205,26 +204,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
rpc.ApicRpcHandlerMixin):
NIC_NAME_LEN = 14
class TopologyRpcEndpoint(object):
target = oslo_messaging.Target(version=arpc.VERSION)
def __init__(self, mechanism_driver):
self.md = mechanism_driver
@db_api.retry_if_session_inactive()
def update_link(self, context, *args, **kwargs):
context._session = db_api.get_writer_session()
return self.md.update_link(context, *args, **kwargs)
@db_api.retry_if_session_inactive()
def delete_link(self, context, *args, **kwargs):
# Don't take any action on link deletion in order to tolerate
# situations like fabric upgrade or flapping links. Old links
# are removed once a specific host is attached somewhere else.
# To completely decommission the host, aimctl can be used to
# cleanup the hostlink table
return
def __init__(self):
LOG.info("APIC AIM MD __init__")
@ -250,12 +229,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.apic_system_id = cfg.CONF.apic_system_id
self.notifier = ofrpc.AgentNotifierApi(n_topics.AGENT)
self.sg_enabled = securitygroups_rpc.is_firewall_enabled()
# setup APIC topology RPC handler
self.topology_conn = n_rpc.create_connection()
self.topology_conn.create_consumer(arpc.TOPIC_APIC_SERVICE,
[self.TopologyRpcEndpoint(self)],
fanout=False)
self.topology_conn.consume_in_threads()
self.keystone_notification_exchange = (cfg.CONF.ml2_apic_aim.
keystone_notification_exchange)
self.keystone_notification_topic = (cfg.CONF.ml2_apic_aim.
@ -284,6 +257,10 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool
self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool])
def start_rpc_listeners(self):
LOG.info("APIC AIM MD starting RPC listeners")
return self._start_rpc_listeners()
def _setup_nova_vm_update(self):
self.admin_context = nctx.get_admin_context()
self.host_id = 'id-%s' % net.get_hostname()

View File

@ -19,6 +19,7 @@ import netaddr
import sqlalchemy as sa
from sqlalchemy.ext import baked
from neutron.common import rpc as n_rpc
from neutron.db import api as db_api
from neutron.db.extra_dhcp_opt import models as dhcp_models
from neutron.db.models import allowed_address_pair as aap_models
@ -33,7 +34,10 @@ from neutron.services.trunk import models as trunk_models
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_constants
from neutron_lib import context as n_context
from opflexagent import host_agent_rpc as oa_rpc
from opflexagent import rpc as o_rpc
from oslo_log import log
import oslo_messaging
from oslo_serialization import jsonutils
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import constants
@ -151,8 +155,49 @@ EndpointTrunkInfo = namedtuple(
'segmentation_id'])
class TopologyRpcEndpoint(object):
target = oslo_messaging.Target(version=oa_rpc.VERSION)
def __init__(self, mechanism_driver):
self.md = mechanism_driver
@db_api.retry_if_session_inactive()
def update_link(self, context, *args, **kwargs):
context._session = db_api.get_writer_session()
return self.md.update_link(context, *args, **kwargs)
@db_api.retry_if_session_inactive()
def delete_link(self, context, *args, **kwargs):
# Don't take any action on link deletion in order to tolerate
# situations like fabric upgrade or flapping links. Old links
# are removed once a specific host is attached somewhere else.
# To completely decommission the host, aimctl can be used to
# cleanup the hostlink table.
return
class ApicRpcHandlerMixin(object):
def _start_rpc_listeners(self):
conn = n_rpc.create_connection()
# Opflex RPC handler.
if self.enable_new_rpc:
conn.create_consumer(
o_rpc.TOPIC_OPFLEX,
[o_rpc.GBPServerRpcCallback(self, self.notifier)],
fanout=False)
# Topology RPC hander.
conn.create_consumer(
oa_rpc.TOPIC_APIC_SERVICE,
[TopologyRpcEndpoint(self)],
fanout=False)
# Start listeners and return list of servers.
return conn.consume_in_threads()
# The following five methods handle RPCs from the Opflex agent.
#
# REVISIT: These handler methods are currently called by
@ -232,7 +277,12 @@ class ApicRpcHandlerMixin(object):
# implementation from get_vrf_details() to this method.
return self.get_vrf_details(context, kwargs)
# REVISIT: def ip_address_owner_update(self, context, **kwargs):
def ip_address_owner_update(self, context, **kwargs):
LOG.debug("APIC AIM MD handling ip_address_owner_update for: %s",
kwargs)
# REVISIT: Move actual handler implementation to this class.
if self.gbp_driver:
self.gbp_driver.ip_address_owner_update(context, **kwargs)
@db_api.retry_if_session_inactive()
def _get_vrf_details(self, context, vrf_id):

View File

@ -93,6 +93,13 @@ class MechanismManager(managers.MechanismManager):
errors=errors
)
def start_rpc_listeners(self):
servers = []
for driver in self.ordered_mech_drivers:
if isinstance(driver.obj, driver_api.MechanismDriver):
servers.extend(driver.obj.start_rpc_listeners())
return servers
def ensure_tenant(self, plugin_context, tenant_id):
for driver in self.ordered_mech_drivers:
if isinstance(driver.obj, driver_api.MechanismDriver):

View File

@ -137,6 +137,11 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin,
self._verify_service_plugins_requirements()
LOG.info("Modular L2 Plugin (extended) initialization complete")
def start_rpc_listeners(self):
servers = super(Ml2PlusPlugin, self).start_rpc_listeners()
servers.extend(self.mechanism_manager.start_rpc_listeners())
return servers
def _handle_security_group_change(self, resource, event, trigger,
**kwargs):
if 'payload' in kwargs:

View File

@ -159,6 +159,8 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
@log.log_method_call
def start_rpc_listeners(self):
if self.aim_mech_driver.enable_new_rpc:
return []
return self.setup_opflex_rpc_listeners()
def validate_state(self, repair):

View File

@ -32,6 +32,10 @@ class LoggerPlusMechanismDriver(driver_api.MechanismDriver,
def initialize(self):
LOG.info("initialize called")
def start_rpc_listeners(self):
LOG.info("start_rpc_listeners called")
return []
def ensure_tenant(self, plugin_context, tenant_id):
LOG.info("ensure_tenant called with tenant_id %s", tenant_id)

View File

@ -285,7 +285,6 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.saved_keystone_client = ksc_client.Client
ksc_client.Client = FakeKeystoneClient
self.plugin = directory.get_plugin()
self.plugin.start_rpc_listeners()
self.driver = self.plugin.mechanism_manager.mech_drivers[
'apic_aim'].obj
self.l3_plugin = directory.get_plugin(n_constants.L3)
@ -471,6 +470,37 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
return verify
class TestRpcListeners(ApicAimTestCase):
@staticmethod
def _consume_in_threads(self):
return self.servers
# REVISIT: Remove new_rpc option with old RPC cleanup.
def _test_start_rpc_listeners(self, new_rpc):
# Override mock from
# neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that
# it returns servers, but still avoids starting them.
with mock.patch('neutron.common.rpc.Connection.consume_in_threads',
TestRpcListeners._consume_in_threads):
# Call plugin method and verify that the apic_aim MD's
# RPC servers are returned.
servers = self.plugin.start_rpc_listeners()
topics = [server._target.topic for server in servers]
self.assertIn('apic-service', topics)
if new_rpc:
self.assertIn('opflex', topics)
else:
self.assertNotIn('opflex', topics)
def test_start_rpc_listeners(self):
self.driver.enable_new_rpc = False
self._test_start_rpc_listeners(False)
def test_start_rpc_listeners_new_rpc(self):
self.driver.enable_new_rpc = True
self._test_start_rpc_listeners(True)
class TestAimMapping(ApicAimTestCase):
def setUp(self):
self.call_wrapper = CallRecordWrapper()

View File

@ -17,6 +17,8 @@ import mock
import testtools
from neutron.api import extensions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.conf.plugins.ml2 import config # noqa
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.tests.unit.api import test_extensions
@ -57,7 +59,6 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase):
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.port_create_status = 'DOWN'
self.plugin = directory.get_plugin()
self.plugin.start_rpc_listeners()
def exist_checker(self, getter):
def verify(context):
@ -67,6 +68,40 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase):
return verify
class TestRpcListeners(Ml2PlusPluginV2TestCase):
@staticmethod
def _consume_in_threads(self):
return self.servers
@staticmethod
def _start_rpc_listeners(self):
conn = n_rpc.create_connection()
conn.create_consumer('q-test-topic', [])
return conn.consume_in_threads()
def test_start_rpc_listeners(self):
# Override mock from
# neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that
# it returns servers, but still avoids starting them.
with mock.patch('neutron.common.rpc.Connection.consume_in_threads',
TestRpcListeners._consume_in_threads):
# Mock logger MD to start an RPC listener.
with mock.patch(
'gbpservice.neutron.tests.unit.plugins.ml2plus.drivers.'
'mechanism_logger.LoggerPlusMechanismDriver.'
'start_rpc_listeners',
TestRpcListeners._start_rpc_listeners):
# Call plugin method and verify that the base ML2
# servers as well as the test MD server are returned.
servers = self.plugin.start_rpc_listeners()
self.assertEqual(sorted([topics.PLUGIN,
topics.SERVER_RESOURCE_VERSIONS,
topics.REPORTS,
'q-test-topic']),
sorted([server._target.topic
for server in servers]))
class TestEnsureTenant(Ml2PlusPluginV2TestCase):
def test_network(self):
with mock.patch.object(mech_logger.LoggerPlusMechanismDriver,