Merge "Break coupling between ML2 and L3 during create/update operations"
This commit is contained in:
commit
68c0e3d479
|
@ -22,6 +22,9 @@ from sqlalchemy import orm
|
|||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.db import agents_db
|
||||
|
@ -29,6 +32,8 @@ from neutron.db import l3_agentschedulers_db as l3agent_sch_db
|
|||
from neutron.db import model_base
|
||||
from neutron.db import models_v2
|
||||
from neutron.i18n import _LI, _LW
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
from neutron.plugins.ml2 import db as ml2_db
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -309,3 +314,27 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
|||
self.bind_dvr_router_servicenode(
|
||||
context, router_id, chosen_agent)
|
||||
return chosen_agent
|
||||
|
||||
|
||||
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
|
||||
LOG.debug('Received %s %s' % (resource, event))
|
||||
port = kwargs['port']
|
||||
if not port:
|
||||
return
|
||||
|
||||
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
mac_address_updated = kwargs.get('mac_address_updated')
|
||||
update_device_up = kwargs.get('update_device_up')
|
||||
context = kwargs['context']
|
||||
if mac_address_updated or update_device_up:
|
||||
l3plugin.dvr_vmarp_table_update(context, port, "add")
|
||||
if n_utils.is_dvr_serviced(port['device_owner']):
|
||||
l3plugin.dvr_update_router_addvm(context, port)
|
||||
|
||||
|
||||
def subscribe():
|
||||
registry.subscribe(
|
||||
_notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE)
|
||||
registry.subscribe(
|
||||
_notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE)
|
||||
|
|
|
@ -34,6 +34,9 @@ from neutron.api.rpc.handlers import dvr_rpc
|
|||
from neutron.api.rpc.handlers import metadata_rpc
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import exceptions as exc
|
||||
from neutron.common import ipv6_utils
|
||||
|
@ -162,20 +165,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
if self.type_manager.network_matches_filters(network, filters)
|
||||
]
|
||||
|
||||
def _notify_l3_agent_new_port(self, context, port):
|
||||
if not port:
|
||||
return
|
||||
|
||||
# Whenever a DVR serviceable port comes up on a
|
||||
# node, it has to be communicated to the L3 Plugin
|
||||
# and agent for creating the respective namespaces.
|
||||
if (utils.is_dvr_serviced(port['device_owner'])):
|
||||
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
if (utils.is_extension_supported(
|
||||
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
|
||||
l3plugin.dvr_update_router_addvm(context, port)
|
||||
|
||||
def _get_host_port_if_changed(self, mech_context, attrs):
|
||||
binding = mech_context._binding
|
||||
host = attrs and attrs.get(portbindings.HOST_ID)
|
||||
|
@ -943,7 +932,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
attrs = port[attributes.PORT]
|
||||
result, mech_context = self._create_port_db(context, port)
|
||||
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
|
||||
self._notify_l3_agent_new_port(context, new_host_port)
|
||||
# notify any plugin that is interested in port create events
|
||||
kwargs = {'context': context, 'port': new_host_port}
|
||||
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
|
||||
|
||||
try:
|
||||
self.mechanism_manager.create_port_postcommit(mech_context)
|
||||
|
@ -979,7 +970,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
if attrs and attrs.get(portbindings.HOST_ID):
|
||||
new_host_port = self._get_host_port_if_changed(
|
||||
obj['mech_context'], attrs)
|
||||
self._notify_l3_agent_new_port(context, new_host_port)
|
||||
kwargs = {'context': context, 'port': new_host_port}
|
||||
registry.notify(
|
||||
resources.PORT, events.AFTER_CREATE, self, **kwargs)
|
||||
|
||||
try:
|
||||
for obj in objects:
|
||||
|
@ -997,11 +990,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
def update_port(self, context, id, port):
|
||||
attrs = port[attributes.PORT]
|
||||
need_port_update_notify = False
|
||||
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
is_dvr_enabled = utils.is_extension_supported(
|
||||
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
|
||||
|
||||
session = context.session
|
||||
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
|
@ -1041,10 +1029,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.mechanism_manager.update_port_precommit(mech_context)
|
||||
|
||||
# Notifications must be sent after the above transaction is complete
|
||||
if mac_address_updated and l3plugin and is_dvr_enabled:
|
||||
# NOTE: "add" actually does a 'replace' operation
|
||||
l3plugin.dvr_vmarp_table_update(context, updated_port, "add")
|
||||
self._notify_l3_agent_new_port(context, new_host_port)
|
||||
kwargs = {
|
||||
'context': context,
|
||||
'port': new_host_port,
|
||||
'mac_address_updated': mac_address_updated,
|
||||
}
|
||||
registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
|
||||
|
||||
# TODO(apech) - handle errors raised by update_port, potentially
|
||||
# by re-calling update_port with the previous attributes. For
|
||||
|
|
|
@ -19,15 +19,16 @@ from sqlalchemy.orm import exc
|
|||
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.handlers import dvr_rpc
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.i18n import _LW
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
from neutron.plugins.ml2.drivers import type_tunnel
|
||||
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
|
||||
|
@ -167,16 +168,21 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
port_id = plugin.update_port_status(rpc_context, port_id,
|
||||
q_const.PORT_STATUS_ACTIVE,
|
||||
host)
|
||||
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
if (l3plugin and
|
||||
utils.is_extension_supported(l3plugin,
|
||||
q_const.L3_DISTRIBUTED_EXT_ALIAS)):
|
||||
try:
|
||||
port = plugin._get_port(rpc_context, port_id)
|
||||
l3plugin.dvr_vmarp_table_update(rpc_context, port, "add")
|
||||
except exceptions.PortNotFound:
|
||||
LOG.debug('Port %s not found during ARP update', port_id)
|
||||
try:
|
||||
# NOTE(armax): it's best to remove all objects from the
|
||||
# session, before we try to retrieve the new port object
|
||||
rpc_context.session.expunge_all()
|
||||
port = plugin._get_port(rpc_context, port_id)
|
||||
except exceptions.PortNotFound:
|
||||
LOG.debug('Port %s not found during update', port_id)
|
||||
else:
|
||||
kwargs = {
|
||||
'context': rpc_context,
|
||||
'port': port,
|
||||
'update_device_up': True
|
||||
}
|
||||
registry.notify(
|
||||
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
|
||||
|
||||
|
||||
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
|
|
|
@ -56,6 +56,8 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
|
|||
cfg.CONF.router_scheduler_driver)
|
||||
self.start_periodic_l3_agent_status_check()
|
||||
super(L3RouterPlugin, self).__init__()
|
||||
if 'dvr' in self.supported_extension_aliases:
|
||||
l3_dvrscheduler_db.subscribe()
|
||||
|
||||
def setup_rpc(self):
|
||||
# RPC support
|
||||
|
|
|
@ -1267,6 +1267,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
|
|||
def setUp(self):
|
||||
super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
|
||||
self.context = mock.MagicMock()
|
||||
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
|
||||
self.notify = self.notify_p.start()
|
||||
|
||||
def _ensure_transaction_is_closed(self):
|
||||
transaction = self.context.session.begin(subtransactions=True)
|
||||
|
@ -1284,9 +1286,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
|
|||
return_value=new_host_port)
|
||||
plugin._check_mac_update_allowed = mock.Mock(return_value=True)
|
||||
|
||||
plugin._notify_l3_agent_new_port = mock.Mock()
|
||||
plugin._notify_l3_agent_new_port.side_effect = (
|
||||
lambda c, p: self._ensure_transaction_is_closed())
|
||||
self.notify.side_effect = (
|
||||
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
|
||||
|
||||
return plugin
|
||||
|
||||
|
@ -1302,33 +1303,28 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
|
|||
|
||||
plugin.create_port(self.context, mock.MagicMock())
|
||||
|
||||
plugin._notify_l3_agent_new_port.assert_called_once_with(
|
||||
self.context, new_host_port)
|
||||
kwargs = {'context': self.context, 'port': new_host_port}
|
||||
self.notify.assert_called_once_with('port', 'after_create',
|
||||
plugin, **kwargs)
|
||||
|
||||
def test_update_port_rpc_outside_transaction(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
|
||||
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
|
||||
mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
|
||||
) as (init, super_update_port, get_service_plugins):
|
||||
) as (init, super_update_port):
|
||||
init.return_value = None
|
||||
l3plugin = mock.Mock()
|
||||
l3plugin.supported_extension_aliases = [
|
||||
constants.L3_DISTRIBUTED_EXT_ALIAS,
|
||||
]
|
||||
get_service_plugins.return_value = {
|
||||
service_constants.L3_ROUTER_NAT: l3plugin,
|
||||
}
|
||||
|
||||
new_host_port = mock.Mock()
|
||||
plugin = self._create_plugin_for_create_update_port(new_host_port)
|
||||
|
||||
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
|
||||
|
||||
plugin._notify_l3_agent_new_port.assert_called_once_with(
|
||||
self.context, new_host_port)
|
||||
l3plugin.dvr_vmarp_table_update.assert_called_once_with(
|
||||
self.context, mock.ANY, "add")
|
||||
kwargs = {
|
||||
'context': self.context,
|
||||
'port': new_host_port,
|
||||
'mac_address_updated': True,
|
||||
}
|
||||
self.notify.assert_called_once_with('port', 'after_update',
|
||||
plugin, **kwargs)
|
||||
|
||||
def test_vmarp_table_update_outside_of_delete_transaction(self):
|
||||
l3plugin = mock.Mock()
|
||||
|
|
|
@ -44,45 +44,32 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
self.type_manager)
|
||||
self.manager = mock.patch.object(
|
||||
plugin_rpc.manager, 'NeutronManager').start()
|
||||
self.l3plugin = mock.Mock()
|
||||
self.manager.get_service_plugins.return_value = {
|
||||
'L3_ROUTER_NAT': self.l3plugin
|
||||
}
|
||||
self.plugin = self.manager.get_plugin()
|
||||
|
||||
def _test_update_device_up(self, extensions, kwargs):
|
||||
def _test_update_device_up(self):
|
||||
kwargs = {
|
||||
'agent_id': 'foo_agent',
|
||||
'device': 'foo_device'
|
||||
}
|
||||
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
|
||||
'._device_to_port_id'):
|
||||
type(self.l3plugin).supported_extension_aliases = (
|
||||
mock.PropertyMock(return_value=extensions))
|
||||
self.callbacks.update_device_up(mock.ANY, **kwargs)
|
||||
with mock.patch('neutron.callbacks.registry.notify') as notify:
|
||||
self.callbacks.update_device_up(mock.Mock(), **kwargs)
|
||||
return notify
|
||||
|
||||
def test_update_device_up_without_dvr(self):
|
||||
def test_update_device_up_notify(self):
|
||||
notify = self._test_update_device_up()
|
||||
kwargs = {
|
||||
'agent_id': 'foo_agent',
|
||||
'device': 'foo_device'
|
||||
'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True
|
||||
}
|
||||
self._test_update_device_up(['router'], kwargs)
|
||||
self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count)
|
||||
notify.assert_called_once_with(
|
||||
'port', 'after_update', self.plugin, **kwargs)
|
||||
|
||||
def test_update_device_up_with_dvr(self):
|
||||
kwargs = {
|
||||
'agent_id': 'foo_agent',
|
||||
'device': 'foo_device'
|
||||
}
|
||||
self._test_update_device_up(['router', 'dvr'], kwargs)
|
||||
self.l3plugin.dvr_vmarp_table_update.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, 'add')
|
||||
|
||||
def test_update_device_up_with_dvr_when_port_not_found(self):
|
||||
kwargs = {
|
||||
'agent_id': 'foo_agent',
|
||||
'device': 'foo_device'
|
||||
}
|
||||
self.l3plugin.dvr_vmarp_table_update.side_effect = (
|
||||
def test_update_device_up_notify_not_sent_with_port_not_found(self):
|
||||
self.plugin._get_port.side_effect = (
|
||||
exceptions.PortNotFound(port_id='foo_port_id'))
|
||||
self._test_update_device_up(['router', 'dvr'], kwargs)
|
||||
self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count)
|
||||
notify = self._test_update_device_up()
|
||||
self.assertFalse(notify.call_count)
|
||||
|
||||
def test_get_device_details_without_port_context(self):
|
||||
self.plugin.get_bound_port_context.return_value = None
|
||||
|
|
Loading…
Reference in New Issue