use callback payloads for SUBNET

This patch switches over to callback payloads for
SUBNET events.

Change-Id: Ic4c3490aed4f899293be993d4663bb537c34ab8b
This commit is contained in:
Nurmatov Mamatisa 2021-05-26 16:27:23 +03:00
parent 9a760b9b14
commit cd8c4f7e30
9 changed files with 102 additions and 69 deletions

View File

@ -114,7 +114,7 @@ class DhcpAgentNotifyAPI(object):
callback = self._native_event_send_dhcp_notification
# TODO(boden): remove shim below once all events use payloads
if resource == resources.NETWORK:
if resource in [resources.NETWORK, resources.SUBNET]:
callback = self._native_event_send_dhcp_notification_payload
if resource == resources.PORT:
registry.subscribe(

View File

@ -500,10 +500,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# cleanup if a network-owned port snuck in without failing
for subnet in subnets:
self._delete_subnet(context, subnet)
# TODO(ralonsoh): use payloads
registry.notify(resources.SUBNET, events.AFTER_DELETE,
self, context=context, subnet=subnet.to_dict(),
for_net_delete=True)
registry.publish(
resources.SUBNET, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context,
resource_id=subnet['id'],
metadata={'for_net_delete': True},
states=(subnet.to_dict(),)))
with db_api.CONTEXT_WRITER.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
@ -942,10 +945,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if gateway_ip:
self.ipam.validate_gw_out_of_pools(gateway_ip, pools)
kwargs = {'context': context, 'original_subnet': orig,
'request': s}
registry.notify(resources.SUBNET, events.BEFORE_UPDATE,
self, **kwargs)
registry.publish(
resources.SUBNET, events.BEFORE_UPDATE, self,
payload=events.DBEventPayload(
context,
resource_id=id,
states=(orig, s,)))
with db_api.CONTEXT_WRITER.using(context):
subnet, changes = self.ipam.update_db_subnet(
@ -996,10 +1001,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if routers:
self.l3_rpc_notifier.routers_updated(context, routers)
kwargs = {'context': context, 'subnet': result,
'original_subnet': orig}
registry.notify(resources.SUBNET, events.AFTER_UPDATE, self,
**kwargs)
registry.publish(
resources.SUBNET, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context,
resource_id=result['id'],
states=(orig, result,)))
return result
def _subnet_get_user_allocation(self, context, subnet_id):
@ -1053,15 +1061,21 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
payload=events.DBEventPayload(context, resource_id=subnet.id))
self._remove_subnet_ip_allocations_from_ports(context, subnet)
self._delete_subnet(context, subnet)
registry.notify(resources.SUBNET, events.AFTER_DELETE,
self, context=context, subnet=subnet.to_dict())
registry.publish(
resources.SUBNET, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context,
resource_id=id,
states=(subnet.to_dict(),)))
def _delete_subnet(self, context, subnet):
with db_api.exc_to_retry(sql_exc.IntegrityError), \
db_api.CONTEXT_WRITER.using(context):
registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE,
self, context=context, subnet_id=subnet.id,
subnet_obj=subnet)
registry.publish(
resources.SUBNET, events.PRECOMMIT_DELETE, self,
payload=events.DBEventPayload(context,
resource_id=subnet.id,
states=(subnet,)))
subnet.delete()
# Delete related ipam subnet manually,
# since there is no FK relationship

View File

@ -1930,13 +1930,13 @@ class L3RpcNotifierMixin(object):
@staticmethod
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _notify_subnet_gateway_ip_update(resource, event, trigger, **kwargs):
def _notify_subnet_gateway_ip_update(resource, event, trigger, payload):
l3plugin = directory.get_plugin(plugin_constants.L3)
if not l3plugin:
return
context = kwargs['context']
orig = kwargs['original_subnet']
updated = kwargs['subnet']
context = payload.context
orig = payload.states[0]
updated = payload.latest_state
if orig['gateway_ip'] == updated['gateway_ip']:
return
network_id = updated['network_id']

View File

@ -39,7 +39,8 @@ LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object):
_PAYLOAD_RESOURCES = (resources.NETWORK,
resources.ADDRESS_GROUP,
resources.SECURITY_GROUP_RULE,)
resources.SECURITY_GROUP_RULE,
resources.SUBNET)
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource

View File

@ -1265,8 +1265,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _before_create_subnet(self, context, subnet):
subnet_data = subnet[subnet_def.RESOURCE_NAME]
registry.notify(resources.SUBNET, events.BEFORE_CREATE, self,
context=context, subnet=subnet_data)
registry.publish(
resources.SUBNET, events.BEFORE_CREATE, self,
payload=events.DBEventPayload(context,
states=(subnet_data,)))
def _create_subnet_db(self, context, subnet):
with db_api.CONTEXT_WRITER.using(context):
@ -1297,8 +1299,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# add network to subnet dict to save a DB call on dhcp notification
result['network'] = mech_context.network.current
kwargs = {'context': context, 'subnet': result}
registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs)
registry.publish(
resources.SUBNET, events.AFTER_CREATE, self,
payload=events.DBEventPayload(context,
resource_id=result['id'],
states=(result,)))
try:
self.mechanism_manager.create_subnet_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -1349,8 +1354,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# handler deleting a subresource of the subnet.
@registry.receives(resources.SUBNET, [events.PRECOMMIT_DELETE], priority=0)
def _subnet_delete_precommit_handler(self, rtype, event, trigger,
context, subnet_id, **kwargs):
subnet_obj = (kwargs.get('subnet_obj') or
payload=None):
context = payload.context
subnet_id = payload.resource_id
subnet_obj = (payload.latest_state or
self._get_subnet_object(context, subnet_id))
subnet = self._make_subnet_dict(subnet_obj, context=context)
mech_context = driver_context.SubnetContext(self, context,
@ -1362,7 +1369,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def _subnet_delete_after_delete_handler(self, rtype, event, trigger,
context, subnet, **kwargs):
payload):
context = payload.context
try:
self.mechanism_manager.delete_subnet_postcommit(
context._mech_context)

View File

@ -440,13 +440,13 @@ class OVNL3RouterPlugin(service_base.ServicePluginBase,
@staticmethod
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _subnet_update(resource, event, trigger, **kwargs):
def _subnet_update(resource, event, trigger, payload):
l3plugin = directory.get_plugin(plugin_constants.L3)
if not l3plugin:
return
context = kwargs['context']
orig = kwargs['original_subnet']
current = kwargs['subnet']
context = payload.context
orig = payload.states[0]
current = payload.latest_state
orig_gw_ip = orig['gateway_ip']
current_gw_ip = current['gateway_ip']
if orig_gw_ip == current_gw_ip:

View File

@ -225,8 +225,9 @@ class NovaSegmentNotifier(object):
segment_host_mappings=segment_host_mappings))
@registry.receives(resources.SUBNET, [events.AFTER_CREATE])
def _notify_subnet_created(self, resource, event, trigger, context,
subnet, **kwargs):
def _notify_subnet_created(self, resource, event, trigger, payload):
context = payload.context
subnet = payload.latest_state
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
@ -304,8 +305,10 @@ class NovaSegmentNotifier(object):
return total, reserved
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _notify_subnet_updated(self, resource, event, trigger, context,
subnet, original_subnet, **kwargs):
def _notify_subnet_updated(self, resource, event, trigger, payload):
context = payload.context
original_subnet = payload.states[0]
subnet = payload.latest_state
segment_id = subnet.get('segment_id')
original_segment_id = original_subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
@ -341,9 +344,10 @@ class NovaSegmentNotifier(object):
segment_host_mappings=segment_host_mappings))
@registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def _notify_subnet_deleted(self, resource, event, trigger, context,
subnet, **kwargs):
if kwargs.get(db.FOR_NET_DELETE):
def _notify_subnet_deleted(self, resource, event, trigger, payload):
context = payload.context
subnet = payload.latest_state
if payload.metadata.get(db.FOR_NET_DELETE):
return # skip segment RP update if it is going to be deleted
segment_id = subnet.get('segment_id')
@ -613,8 +617,10 @@ class SegmentHostRoutes(object):
'host_routes': calc_host_routes}})
@registry.receives(resources.SUBNET, [events.BEFORE_CREATE])
def host_routes_before_create(self, resource, event, trigger, context,
subnet, **kwargs):
def host_routes_before_create(self, resource, event, trigger,
payload):
context = payload.context
subnet = payload.latest_state
segment_id = subnet.get('segment_id')
gateway_ip = subnet.get('gateway_ip')
if validators.is_attr_set(subnet.get('host_routes')):
@ -635,9 +641,11 @@ class SegmentHostRoutes(object):
subnet['host_routes'] = calc_host_routes
@registry.receives(resources.SUBNET, [events.BEFORE_UPDATE])
def host_routes_before_update(self, resource, event, trigger, **kwargs):
context = kwargs['context']
subnet, original_subnet = kwargs['request'], kwargs['original_subnet']
def host_routes_before_update(self, resource, event, trigger,
payload):
context = payload.context
original_subnet = payload.states[0]
subnet = payload.latest_state
orig_segment_id = original_subnet.get('segment_id')
segment_id = subnet.get('segment_id', orig_segment_id)
orig_gateway_ip = original_subnet.get('gateway_ip')
@ -659,9 +667,10 @@ class SegmentHostRoutes(object):
subnet['host_routes'] = calc_host_routes
@registry.receives(resources.SUBNET, [events.AFTER_CREATE])
def host_routes_after_create(self, resource, event, trigger, **kwargs):
context = kwargs['context']
subnet = kwargs['subnet']
def host_routes_after_create(self, resource, event, trigger,
payload):
context = payload.context
subnet = payload.latest_state
# If there are other subnets on the network and subnet has segment_id
# ensure host routes for all subnets are updated.
@ -671,11 +680,13 @@ class SegmentHostRoutes(object):
subnet['network_id'])
@registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def host_routes_after_delete(self, resource, event, trigger, context,
subnet, **kwargs):
def host_routes_after_delete(self, resource, event, trigger,
payload):
# If this is a routed network, remove any routes to this subnet on
# this networks remaining subnets.
if kwargs.get(db.FOR_NET_DELETE):
context = payload.context
subnet = payload.latest_state
if payload.metadata.get(db.FOR_NET_DELETE):
return # skip subnet update if the network is going to be deleted
if subnet.get('segment_id'):

View File

@ -267,7 +267,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT, resources.SUBNET):
for res in (resources.PORT,):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
if res == resources.PORT:
@ -295,7 +295,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in [resources.NETWORK]:
for res in (resources.NETWORK, resources.SUBNET):
self.notifier._unsubscribed_resources = []
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))

View File

@ -709,11 +709,11 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
with self.subnet() as s:
before_create.assert_called_once_with(
resources.SUBNET, events.BEFORE_CREATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY)
kwargs = before_create.mock_calls[0][2]
self.assertEqual(s['subnet']['cidr'], kwargs['subnet']['cidr'])
payload=mock.ANY)
payload = before_create.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['cidr'], payload.latest_state['cidr'])
self.assertEqual(s['subnet']['network_id'],
kwargs['subnet']['network_id'])
payload.latest_state['network_id'])
def test_subnet_after_create_callback(self):
after_create = mock.Mock()
@ -721,9 +721,9 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
with self.subnet() as s:
after_create.assert_called_once_with(
resources.SUBNET, events.AFTER_CREATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY)
kwargs = after_create.mock_calls[0][2]
self.assertEqual(s['subnet']['id'], kwargs['subnet']['id'])
payload=mock.ANY)
payload = after_create.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['id'], payload.resource_id)
def test_port_create_subnetnotfound(self):
with self.network() as n:
@ -779,12 +779,11 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
self.deserialize(self.fmt, req.get_response(self.api))
after_update.assert_called_once_with(
resources.SUBNET, events.AFTER_UPDATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY,
original_subnet=mock.ANY)
kwargs = after_update.mock_calls[0][2]
payload=mock.ANY)
payload = after_update.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['name'],
kwargs['original_subnet']['name'])
self.assertEqual('updated', kwargs['subnet']['name'])
payload.states[0]['name'])
self.assertEqual('updated', payload.latest_state['name'])
def test_subnet_after_delete_callback(self):
after_delete = mock.Mock()
@ -794,9 +793,9 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
req.get_response(self.api)
after_delete.assert_called_once_with(
resources.SUBNET, events.AFTER_DELETE, mock.ANY,
context=mock.ANY, subnet=mock.ANY)
kwargs = after_delete.mock_calls[0][2]
self.assertEqual(s['subnet']['id'], kwargs['subnet']['id'])
payload=mock.ANY)
payload = after_delete.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['id'], payload.latest_state['id'])
def test_delete_subnet_race_with_dhcp_port_creation(self):
with self.network() as network: