Merge "use payloads for PORT AFTER_CREATE events"
This commit is contained in:
commit
df94641b43
@ -117,8 +117,12 @@ class DhcpAgentNotifyAPI(object):
|
||||
# TODO(boden): remove shim below once all events use payloads
|
||||
if resource == resources.NETWORK:
|
||||
callback = self._native_event_send_dhcp_notification_payload
|
||||
|
||||
registry.subscribe(callback, resource, events.AFTER_CREATE)
|
||||
if resource == resources.PORT:
|
||||
registry.subscribe(
|
||||
self._native_event_send_dhcp_notification_payload,
|
||||
resource, events.AFTER_CREATE)
|
||||
else:
|
||||
registry.subscribe(callback, resource, events.AFTER_CREATE)
|
||||
registry.subscribe(callback, resource, events.AFTER_UPDATE)
|
||||
registry.subscribe(callback, resource, events.AFTER_DELETE)
|
||||
|
||||
|
@ -533,17 +533,17 @@ def _dvr_handle_unbound_allowed_addr_pair_del(
|
||||
context, port, fixed_ips_to_delete=aa_fixed_ips)
|
||||
|
||||
|
||||
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
|
||||
def _notify_l3_agent_new_port(resource, event, trigger, payload=None):
|
||||
LOG.debug('Received %(resource)s %(event)s', {
|
||||
'resource': resource,
|
||||
'event': event})
|
||||
port = kwargs.get('port')
|
||||
port = payload.latest_state
|
||||
if not port:
|
||||
return
|
||||
|
||||
if n_utils.is_dvr_serviced(port['device_owner']):
|
||||
l3plugin = directory.get_plugin(plugin_constants.L3)
|
||||
context = kwargs['context']
|
||||
context = payload.context
|
||||
l3plugin.dvr_handle_new_service_port(context, port)
|
||||
l3plugin.update_arp_entry_for_dvr_service_port(context, port)
|
||||
|
||||
|
@ -41,8 +41,15 @@ DHCP_RULE_PORT = {4: (67, 68, const.IPv4), 6: (547, 546, const.IPv6)}
|
||||
class SecurityGroupServerNotifierRpcMixin(sg_db.SecurityGroupDbMixin):
|
||||
"""Mixin class to add agent-based security group implementation."""
|
||||
|
||||
@registry.receives(resources.PORT, [events.AFTER_CREATE,
|
||||
events.AFTER_UPDATE,
|
||||
@registry.receives(resources.PORT, [events.AFTER_CREATE])
|
||||
def _notify_sg_on_port_after_update(
|
||||
self, resource, event, trigger, payload=None):
|
||||
# TODO(boden): refact back into single method when all callbacks are
|
||||
# moved to payload style events
|
||||
self.notify_security_groups_member_updated(
|
||||
payload.context, payload.latest_state)
|
||||
|
||||
@registry.receives(resources.PORT, [events.AFTER_UPDATE,
|
||||
events.AFTER_DELETE])
|
||||
def notify_sg_on_port_change(self, resource, event, trigger, context,
|
||||
port, *args, **kwargs):
|
||||
|
@ -455,12 +455,13 @@ def _filter_by_subnet(context, fixed_ips):
|
||||
return [str(ip['ip_address']) for ip in fixed_ips]
|
||||
|
||||
|
||||
def _create_port_in_external_dns_service(resource, event, trigger, **kwargs):
|
||||
def _create_port_in_external_dns_service(resource, event,
|
||||
trigger, payload=None):
|
||||
dns_driver = _get_dns_driver()
|
||||
if not dns_driver:
|
||||
return
|
||||
context = kwargs['context']
|
||||
port = kwargs['port']
|
||||
context = payload.context
|
||||
port = payload.latest_state
|
||||
dns_data_db = port_obj.PortDNS.get_object(
|
||||
context, port_id=port['id'])
|
||||
if not (dns_data_db and dns_data_db['current_dns_name']):
|
||||
|
@ -1462,8 +1462,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# add network to port dict to save a DB call on dhcp notification
|
||||
result['network'] = mech_context.network.current
|
||||
# notify any plugin that is interested in port create events
|
||||
kwargs = {'context': context, 'port': result}
|
||||
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
|
||||
registry.publish(resources.PORT, events.AFTER_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
context, states=(result,),
|
||||
resource_id=result['id']))
|
||||
|
||||
try:
|
||||
self.mechanism_manager.create_port_postcommit(mech_context)
|
||||
|
@ -416,8 +416,15 @@ class NovaSegmentNotifier(object):
|
||||
'routed network segment %(segment_id)s',
|
||||
{'host': event.host, 'segment_id': segment_id})
|
||||
|
||||
@registry.receives(resources.PORT,
|
||||
[events.AFTER_CREATE, events.AFTER_DELETE])
|
||||
@registry.receives(resources.PORT, [events.AFTER_CREATE])
|
||||
def _notify_port_created(self, resource, event, trigger,
|
||||
payload=None):
|
||||
# TODO(boden): refactor back into 1 method when all code is moved
|
||||
# to event payloads
|
||||
return self._notify_port_created_or_deleted(
|
||||
resource, event, trigger, payload.context, payload.latest_state)
|
||||
|
||||
@registry.receives(resources.PORT, [events.AFTER_DELETE])
|
||||
def _notify_port_created_or_deleted(self, resource, event, trigger,
|
||||
context, port, **kwargs):
|
||||
if not self._does_port_require_nova_inventory_update(port):
|
||||
|
@ -266,8 +266,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
for res in (resources.PORT, resources.SUBNET):
|
||||
self.notifier._unsubscribed_resources = []
|
||||
kwargs = {res: {}}
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
if res == resources.PORT:
|
||||
registry.publish(res, events.AFTER_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
mock.Mock(), states=({res: {}},)))
|
||||
else:
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
# don't unsubscribe until all three types are observed
|
||||
self.assertEqual([], self.notifier._unsubscribed_resources)
|
||||
registry.notify(res, events.AFTER_UPDATE, self,
|
||||
@ -277,8 +282,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
context=mock.Mock(), **kwargs)
|
||||
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||
# after first time, no further unsubscribing should happen
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
if res == resources.PORT:
|
||||
registry.publish(res, events.AFTER_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
mock.Mock(), states=({res: {}})))
|
||||
else:
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||
|
||||
for res in [resources.NETWORK]:
|
||||
|
@ -1080,8 +1080,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
||||
|
||||
def test_port_after_create_outside_transaction(self):
|
||||
self.tx_open = True
|
||||
receive = lambda *a, **k: setattr(self, 'tx_open',
|
||||
k['context'].session.is_active)
|
||||
|
||||
def receive(r, e, t, payload=None):
|
||||
setattr(self, 'tx_open', payload.context.session.is_active)
|
||||
|
||||
registry.subscribe(receive, resources.PORT, events.AFTER_CREATE)
|
||||
with self.port():
|
||||
self.assertFalse(self.tx_open)
|
||||
@ -1752,7 +1754,10 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
|
||||
updated_ports = []
|
||||
created_ports = []
|
||||
ureceiver = lambda *a, **k: updated_ports.append(k['port'])
|
||||
creceiver = lambda *a, **k: created_ports.append(k['port'])
|
||||
|
||||
def creceiver(r, e, t, payload=None):
|
||||
created_ports.append(payload.latest_state)
|
||||
|
||||
registry.subscribe(ureceiver, resources.PORT,
|
||||
events.AFTER_UPDATE)
|
||||
registry.subscribe(creceiver, resources.PORT,
|
||||
|
@ -21,9 +21,12 @@ from unittest import mock
|
||||
from neutron_lib.api.definitions import l3_ext_ha_mode
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib.api.definitions import router_availability_zone
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import resources
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib.exceptions import l3 as l3_exc
|
||||
from neutron_lib import fixture
|
||||
from neutron_lib.plugins import constants as plugin_constants
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib import rpc as n_rpc
|
||||
@ -810,6 +813,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
|
||||
service_plugins = None
|
||||
super(L3DvrSchedulerTestCase, self).setUp('ml2',
|
||||
service_plugins=service_plugins)
|
||||
self.useFixture(fixture.CallbackRegistryFixture())
|
||||
self.setup_coreplugin('ml2')
|
||||
self.adminContext = n_context.get_admin_context()
|
||||
self.dut = L3DvrScheduler()
|
||||
@ -947,35 +951,28 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
|
||||
self.assertFalse(l3plugin.get_dvr_routers_to_remove.called)
|
||||
|
||||
def test__notify_l3_agent_new_port_action(self):
|
||||
kwargs = {
|
||||
'context': self.adminContext,
|
||||
'original_port': None,
|
||||
'port': {
|
||||
'device_owner': DEVICE_OWNER_COMPUTE,
|
||||
},
|
||||
}
|
||||
port_dict = {'device_owner': DEVICE_OWNER_COMPUTE}
|
||||
l3plugin = mock.Mock()
|
||||
directory.add_plugin(plugin_constants.L3, l3plugin)
|
||||
l3_dvrscheduler_db._notify_l3_agent_new_port(
|
||||
'port', 'after_create', mock.ANY, **kwargs)
|
||||
resources.PORT, events.AFTER_CREATE, mock.ANY,
|
||||
payload=events.DBEventPayload(
|
||||
self.adminContext, states=(port_dict,)))
|
||||
l3plugin.update_arp_entry_for_dvr_service_port.\
|
||||
assert_called_once_with(
|
||||
self.adminContext, kwargs.get('port'))
|
||||
self.adminContext, port_dict)
|
||||
l3plugin.dvr_handle_new_service_port.assert_called_once_with(
|
||||
self.adminContext, kwargs.get('port'))
|
||||
self.adminContext, port_dict)
|
||||
|
||||
def test__notify_l3_agent_new_port_no_action(self):
|
||||
kwargs = {
|
||||
'context': self.adminContext,
|
||||
'original_port': None,
|
||||
'port': {
|
||||
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None',
|
||||
}
|
||||
}
|
||||
port_dict = {
|
||||
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None'}
|
||||
l3plugin = mock.Mock()
|
||||
directory.add_plugin(plugin_constants.L3, l3plugin)
|
||||
l3_dvrscheduler_db._notify_l3_agent_new_port(
|
||||
'port', 'after_create', mock.ANY, **kwargs)
|
||||
resources.PORT, events.AFTER_CREATE, mock.ANY,
|
||||
payload=events.DBEventPayload(
|
||||
self.adminContext, states=(port_dict,)))
|
||||
self.assertFalse(
|
||||
l3plugin.update_arp_entry_for_dvr_service_port.called)
|
||||
self.assertFalse(
|
||||
|
Loading…
x
Reference in New Issue
Block a user