Break coupling between ML2 and L3 during create/update operations

This is an initial patch in a series that, by using an event framework,
cleans up the relationship between ML2 and L3, so that they are no longer
tightly coupled. A follow-up will take address the coupling during the
port delete operation.

The newly introduced notification hooks not only benefit the L3 service
plugin, but any other plugin that has an interest in knowing about port
events.

Long term, the notification bits can move in a more 'common' place so that
other plugins can take advantage of them, but as mentioned in a parent patch,
the perestroika is not quite there yet.

Related-blueprint: services-split
Related-blueprint: plugin-interface-perestroika

Change-Id: I6527b1cb53a71a1f68329a7a3b1878094558f8c2
This commit is contained in:
armando-migliaccio 2015-02-05 08:40:04 -08:00
parent 48637c8933
commit 3f3874717c
6 changed files with 96 additions and 86 deletions

View File

@ -22,6 +22,9 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import utils as n_utils from neutron.common import utils as n_utils
from neutron.db import agents_db from neutron.db import agents_db
@ -29,6 +32,8 @@ from neutron.db import l3_agentschedulers_db as l3agent_sch_db
from neutron.db import model_base from neutron.db import model_base
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.i18n import _LI, _LW from neutron.i18n import _LI, _LW
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db as ml2_db from neutron.plugins.ml2 import db as ml2_db
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -309,3 +314,27 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
self.bind_dvr_router_servicenode( self.bind_dvr_router_servicenode(
context, router_id, chosen_agent) context, router_id, chosen_agent)
return chosen_agent return chosen_agent
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
LOG.debug('Received %s %s' % (resource, event))
port = kwargs['port']
if not port:
return
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
mac_address_updated = kwargs.get('mac_address_updated')
update_device_up = kwargs.get('update_device_up')
context = kwargs['context']
if mac_address_updated or update_device_up:
l3plugin.dvr_vmarp_table_update(context, port, "add")
if n_utils.is_dvr_serviced(port['device_owner']):
l3plugin.dvr_update_router_addvm(context, port)
def subscribe():
registry.subscribe(
_notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE)
registry.subscribe(
_notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE)

View File

@ -33,6 +33,9 @@ from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import metadata_rpc from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
from neutron.common import ipv6_utils from neutron.common import ipv6_utils
@ -161,20 +164,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if self.type_manager.network_matches_filters(network, filters) if self.type_manager.network_matches_filters(network, filters)
] ]
def _notify_l3_agent_new_port(self, context, port):
if not port:
return
# Whenever a DVR serviceable port comes up on a
# node, it has to be communicated to the L3 Plugin
# and agent for creating the respective namespaces.
if (utils.is_dvr_serviced(port['device_owner'])):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
def _get_host_port_if_changed(self, mech_context, attrs): def _get_host_port_if_changed(self, mech_context, attrs):
binding = mech_context._binding binding = mech_context._binding
host = attrs and attrs.get(portbindings.HOST_ID) host = attrs and attrs.get(portbindings.HOST_ID)
@ -936,7 +925,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
attrs = port[attributes.PORT] attrs = port[attributes.PORT]
result, mech_context = self._create_port_db(context, port) result, mech_context = self._create_port_db(context, port)
new_host_port = self._get_host_port_if_changed(mech_context, attrs) new_host_port = self._get_host_port_if_changed(mech_context, attrs)
self._notify_l3_agent_new_port(context, new_host_port) # notify any plugin that is interested in port create events
kwargs = {'context': context, 'port': new_host_port}
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
try: try:
self.mechanism_manager.create_port_postcommit(mech_context) self.mechanism_manager.create_port_postcommit(mech_context)
@ -972,7 +963,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if attrs and attrs.get(portbindings.HOST_ID): if attrs and attrs.get(portbindings.HOST_ID):
new_host_port = self._get_host_port_if_changed( new_host_port = self._get_host_port_if_changed(
obj['mech_context'], attrs) obj['mech_context'], attrs)
self._notify_l3_agent_new_port(context, new_host_port) kwargs = {'context': context, 'port': new_host_port}
registry.notify(
resources.PORT, events.AFTER_CREATE, self, **kwargs)
try: try:
for obj in objects: for obj in objects:
@ -990,11 +983,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def update_port(self, context, id, port): def update_port(self, context, id, port):
attrs = port[attributes.PORT] attrs = port[attributes.PORT]
need_port_update_notify = False need_port_update_notify = False
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
is_dvr_enabled = utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
session = context.session session = context.session
# REVISIT: Serialize this operation with a semaphore to # REVISIT: Serialize this operation with a semaphore to
@ -1034,10 +1022,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.update_port_precommit(mech_context) self.mechanism_manager.update_port_precommit(mech_context)
# Notifications must be sent after the above transaction is complete # Notifications must be sent after the above transaction is complete
if mac_address_updated and l3plugin and is_dvr_enabled: kwargs = {
# NOTE: "add" actually does a 'replace' operation 'context': context,
l3plugin.dvr_vmarp_table_update(context, updated_port, "add") 'port': new_host_port,
self._notify_l3_agent_new_port(context, new_host_port) 'mac_address_updated': mac_address_updated,
}
registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
# TODO(apech) - handle errors raised by update_port, potentially # TODO(apech) - handle errors raised by update_port, potentially
# by re-calling update_port with the previous attributes. For # by re-calling update_port with the previous attributes. For

View File

@ -19,15 +19,16 @@ from sqlalchemy.orm import exc
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import dvr_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions from neutron.common import exceptions
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.common import topics from neutron.common import topics
from neutron.common import utils
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron.i18n import _LW from neutron.i18n import _LW
from neutron import manager from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the # REVISIT(kmestery): Allow the type and mechanism drivers to supply the
@ -167,16 +168,21 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
port_id = plugin.update_port_status(rpc_context, port_id, port_id = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE, q_const.PORT_STATUS_ACTIVE,
host) host)
l3plugin = manager.NeutronManager.get_service_plugins().get( try:
service_constants.L3_ROUTER_NAT) # NOTE(armax): it's best to remove all objects from the
if (l3plugin and # session, before we try to retrieve the new port object
utils.is_extension_supported(l3plugin, rpc_context.session.expunge_all()
q_const.L3_DISTRIBUTED_EXT_ALIAS)): port = plugin._get_port(rpc_context, port_id)
try: except exceptions.PortNotFound:
port = plugin._get_port(rpc_context, port_id) LOG.debug('Port %s not found during update', port_id)
l3plugin.dvr_vmarp_table_update(rpc_context, port, "add") else:
except exceptions.PortNotFound: kwargs = {
LOG.debug('Port %s not found during ARP update', port_id) 'context': rpc_context,
'port': port,
'update_device_up': True
}
registry.notify(
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,

View File

@ -56,6 +56,8 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
cfg.CONF.router_scheduler_driver) cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check() self.start_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__() super(L3RouterPlugin, self).__init__()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
def setup_rpc(self): def setup_rpc(self):
# RPC support # RPC support

View File

@ -1251,6 +1251,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestMl2PluginCreateUpdateDeletePort, self).setUp() super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
self.context = mock.MagicMock() self.context = mock.MagicMock()
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
self.notify = self.notify_p.start()
def _ensure_transaction_is_closed(self): def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True) transaction = self.context.session.begin(subtransactions=True)
@ -1268,9 +1270,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
return_value=new_host_port) return_value=new_host_port)
plugin._check_mac_update_allowed = mock.Mock(return_value=True) plugin._check_mac_update_allowed = mock.Mock(return_value=True)
plugin._notify_l3_agent_new_port = mock.Mock() self.notify.side_effect = (
plugin._notify_l3_agent_new_port.side_effect = ( lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
lambda c, p: self._ensure_transaction_is_closed())
return plugin return plugin
@ -1286,33 +1287,28 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
plugin.create_port(self.context, mock.MagicMock()) plugin.create_port(self.context, mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with( kwargs = {'context': self.context, 'port': new_host_port}
self.context, new_host_port) self.notify.assert_called_once_with('port', 'after_create',
plugin, **kwargs)
def test_update_port_rpc_outside_transaction(self): def test_update_port_rpc_outside_transaction(self):
with contextlib.nested( with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'), mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'), mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
mock.patch.object(manager.NeutronManager, 'get_service_plugins'), ) as (init, super_update_port):
) as (init, super_update_port, get_service_plugins):
init.return_value = None init.return_value = None
l3plugin = mock.Mock()
l3plugin.supported_extension_aliases = [
constants.L3_DISTRIBUTED_EXT_ALIAS,
]
get_service_plugins.return_value = {
service_constants.L3_ROUTER_NAT: l3plugin,
}
new_host_port = mock.Mock() new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port) plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.update_port(self.context, 'fake_id', mock.MagicMock()) plugin.update_port(self.context, 'fake_id', mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with( kwargs = {
self.context, new_host_port) 'context': self.context,
l3plugin.dvr_vmarp_table_update.assert_called_once_with( 'port': new_host_port,
self.context, mock.ANY, "add") 'mac_address_updated': True,
}
self.notify.assert_called_once_with('port', 'after_update',
plugin, **kwargs)
def test_vmarp_table_update_outside_of_delete_transaction(self): def test_vmarp_table_update_outside_of_delete_transaction(self):
l3plugin = mock.Mock() l3plugin = mock.Mock()

View File

@ -44,45 +44,32 @@ class RpcCallbacksTestCase(base.BaseTestCase):
self.type_manager) self.type_manager)
self.manager = mock.patch.object( self.manager = mock.patch.object(
plugin_rpc.manager, 'NeutronManager').start() plugin_rpc.manager, 'NeutronManager').start()
self.l3plugin = mock.Mock()
self.manager.get_service_plugins.return_value = {
'L3_ROUTER_NAT': self.l3plugin
}
self.plugin = self.manager.get_plugin() self.plugin = self.manager.get_plugin()
def _test_update_device_up(self, extensions, kwargs): def _test_update_device_up(self):
kwargs = {
'agent_id': 'foo_agent',
'device': 'foo_device'
}
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'): '._device_to_port_id'):
type(self.l3plugin).supported_extension_aliases = ( with mock.patch('neutron.callbacks.registry.notify') as notify:
mock.PropertyMock(return_value=extensions)) self.callbacks.update_device_up(mock.Mock(), **kwargs)
self.callbacks.update_device_up(mock.ANY, **kwargs) return notify
def test_update_device_up_without_dvr(self): def test_update_device_up_notify(self):
notify = self._test_update_device_up()
kwargs = { kwargs = {
'agent_id': 'foo_agent', 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True
'device': 'foo_device'
} }
self._test_update_device_up(['router'], kwargs) notify.assert_called_once_with(
self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count) 'port', 'after_update', self.plugin, **kwargs)
def test_update_device_up_with_dvr(self): def test_update_device_up_notify_not_sent_with_port_not_found(self):
kwargs = { self.plugin._get_port.side_effect = (
'agent_id': 'foo_agent',
'device': 'foo_device'
}
self._test_update_device_up(['router', 'dvr'], kwargs)
self.l3plugin.dvr_vmarp_table_update.assert_called_once_with(
mock.ANY, mock.ANY, 'add')
def test_update_device_up_with_dvr_when_port_not_found(self):
kwargs = {
'agent_id': 'foo_agent',
'device': 'foo_device'
}
self.l3plugin.dvr_vmarp_table_update.side_effect = (
exceptions.PortNotFound(port_id='foo_port_id')) exceptions.PortNotFound(port_id='foo_port_id'))
self._test_update_device_up(['router', 'dvr'], kwargs) notify = self._test_update_device_up()
self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count) self.assertFalse(notify.call_count)
def test_get_device_details_without_port_context(self): def test_get_device_details_without_port_context(self):
self.plugin.get_bound_port_context.return_value = None self.plugin.get_bound_port_context.return_value = None