Use payloads for ROUTER callbacks
This patch switches over to callback payloads for ROUTER BEFORE_CREATE, PRECOMMIT_CREATE, BEFORE_UPDATE and PRECOMMIT_DELETE events. Change-Id: I4a52c773d3f753c918df0986f1d261083156651c
This commit is contained in:
parent
19372a3cd8
commit
40c8f60ee3
|
@ -482,8 +482,11 @@ class L3NATAgent(ha.AgentMixin,
|
|||
|
||||
def _router_added(self, router_id, router):
|
||||
ri = self._create_router(router_id, router)
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
|
||||
self, router=ri)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
self.context,
|
||||
resource_id=router_id,
|
||||
states=(ri,)))
|
||||
|
||||
self.router_info[router_id] = ri
|
||||
|
||||
|
@ -688,8 +691,12 @@ class L3NATAgent(ha.AgentMixin,
|
|||
self.check_ha_state_for_router(
|
||||
router['id'], router.get(lib_const.HA_ROUTER_STATE_KEY))
|
||||
ri.router = router
|
||||
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
|
||||
self, router=ri)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_UPDATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
self.context,
|
||||
resource_id=router['id'],
|
||||
states=(ri,)))
|
||||
|
||||
ri.process()
|
||||
registry.notify(
|
||||
resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
|
||||
|
|
|
@ -369,8 +369,8 @@ def get_router_entry(ns_name, primary):
|
|||
|
||||
|
||||
@runtime.synchronized("l3-agent-pd")
|
||||
def add_router(resource, event, l3_agent, **kwargs):
|
||||
added_router = kwargs['router']
|
||||
def add_router(resource, event, l3_agent, payload):
|
||||
added_router = payload.latest_state
|
||||
router = l3_agent.pd.routers.get(added_router.router_id)
|
||||
gw_ns_name = added_router.get_gw_ns_name()
|
||||
primary = added_router.is_router_primary()
|
||||
|
|
|
@ -40,8 +40,10 @@ class RouterAvailabilityZoneMixin(l3_attrs_db.ExtraAttributesMixin):
|
|||
l3_plugin.get_router_availability_zones(router_db))
|
||||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE])
|
||||
def _process_az_request(self, resource, event, trigger, context,
|
||||
router, router_db, **kwargs):
|
||||
def _process_az_request(self, resource, event, trigger, payload):
|
||||
context = payload.context
|
||||
router = payload.latest_state
|
||||
router_db = payload.metadata['router_db']
|
||||
if az_def.AZ_HINTS in router:
|
||||
self.validate_availability_zones(context, 'router',
|
||||
router[az_def.AZ_HINTS])
|
||||
|
|
|
@ -229,8 +229,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
|||
"""Create the DB object."""
|
||||
router.setdefault('id', uuidutils.generate_uuid())
|
||||
router['tenant_id'] = tenant_id
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
|
||||
self, context=context, router=router)
|
||||
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
context,
|
||||
resource_id=router['id'],
|
||||
states=(router,)))
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
# pre-generate id so it will be available when
|
||||
# configuring external gw port
|
||||
|
@ -242,9 +247,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
|||
status=constants.ACTIVE,
|
||||
description=router.get('description'))
|
||||
context.session.add(router_db)
|
||||
registry.notify(resources.ROUTER, events.PRECOMMIT_CREATE,
|
||||
self, context=context, router=router,
|
||||
router_id=router['id'], router_db=router_db)
|
||||
|
||||
registry.publish(resources.ROUTER, events.PRECOMMIT_CREATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
context,
|
||||
resource_id=router['id'],
|
||||
metadata={'router_db': router_db},
|
||||
states=(router,)))
|
||||
return router_db
|
||||
|
||||
def _update_gw_for_create_router(self, context, gw_info, router_id):
|
||||
|
@ -562,9 +571,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
|||
l3_port_check=False)
|
||||
|
||||
router = self._get_router(context, id)
|
||||
registry.notify(resources.ROUTER, events.PRECOMMIT_DELETE,
|
||||
self, context=context, router_db=router,
|
||||
router_id=id)
|
||||
registry.publish(resources.ROUTER, events.PRECOMMIT_DELETE, self,
|
||||
payload=events.DBEventPayload(
|
||||
context,
|
||||
resource_id=id,
|
||||
states=(router,)))
|
||||
# we bump the revision even though we are about to delete to throw
|
||||
# staledataerror if something stuck in with a new interface
|
||||
router.bump_revision()
|
||||
|
|
|
@ -101,9 +101,11 @@ class DVRResourceOperationHandler(object):
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE],
|
||||
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
||||
def _set_distributed_flag(self, resource, event, trigger, context,
|
||||
router, router_db, **kwargs):
|
||||
def _set_distributed_flag(self, resource, event, trigger, payload):
|
||||
"""Event handler to set distributed flag on creation."""
|
||||
context = payload.context
|
||||
router = payload.latest_state
|
||||
router_db = payload.metadata['router_db']
|
||||
dist = is_distributed_router(router)
|
||||
router['distributed'] = dist
|
||||
self.l3plugin.set_extra_attr_value(context, router_db, 'distributed',
|
||||
|
@ -148,9 +150,12 @@ class DVRResourceOperationHandler(object):
|
|||
# Notify advanced services of the imminent state transition
|
||||
# for the router.
|
||||
try:
|
||||
kwargs = {'context': context, 'router': router_db}
|
||||
registry.notify(
|
||||
resources.ROUTER, events.BEFORE_UPDATE, self, **kwargs)
|
||||
registry.publish(
|
||||
resources.ROUTER, events.BEFORE_UPDATE, self,
|
||||
payload=events.DBEventPayload(
|
||||
context,
|
||||
resource_id=router_db['id'],
|
||||
states=(old_router, router_db,)))
|
||||
except exceptions.CallbackFailure as e:
|
||||
# NOTE(armax): preserve old check's behavior
|
||||
if len(e.errors) == 1:
|
||||
|
|
|
@ -353,9 +353,13 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.BEFORE_CREATE],
|
||||
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
||||
def _before_router_create_handler(self, resource, event, trigger,
|
||||
payload):
|
||||
return self._before_router_create(event, payload.context,
|
||||
payload.latest_state)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def _before_router_create(self, resource, event, trigger,
|
||||
context, router, **kwargs):
|
||||
def _before_router_create(self, event, context, router):
|
||||
"""Event handler to create HA resources before router creation."""
|
||||
if not self._is_ha(router):
|
||||
return
|
||||
|
@ -367,9 +371,11 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE],
|
||||
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
||||
def _precommit_router_create(self, resource, event, trigger, context,
|
||||
router, router_db, **kwargs):
|
||||
def _precommit_router_create(self, resource, event, trigger, payload):
|
||||
"""Event handler to set ha flag and status on creation."""
|
||||
context = payload.context
|
||||
router = payload.latest_state
|
||||
router_db = payload.metadata['router_db']
|
||||
is_ha = self._is_ha(router)
|
||||
router['ha'] = is_ha
|
||||
self.set_extra_attr_value(context, router_db, 'ha', is_ha)
|
||||
|
@ -510,9 +516,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_DELETE],
|
||||
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
||||
def _release_router_vr_id(self, resource, event, trigger, context,
|
||||
router_db, **kwargs):
|
||||
def _release_router_vr_id(self, resource, event, trigger, payload):
|
||||
"""Event handler for removal of VRID during router delete."""
|
||||
context = payload.context
|
||||
router_db = payload.latest_state
|
||||
if router_db.extra_attributes.ha:
|
||||
ha_network = self.get_ha_network(context,
|
||||
router_db.tenant_id)
|
||||
|
|
|
@ -67,27 +67,31 @@ class DriverController(object):
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.BEFORE_CREATE],
|
||||
priority_group.PRIORITY_ROUTER_CONTROLLER)
|
||||
def _check_router_request(self, resource, event, trigger, context,
|
||||
router, **kwargs):
|
||||
def _check_router_request(self, resource, event, trigger, payload):
|
||||
"""Validates that API request is sane (flags compat with flavor)."""
|
||||
context = payload.context
|
||||
router = payload.latest_state
|
||||
drv = self._get_provider_for_create(context, router)
|
||||
_ensure_driver_supports_request(drv, router)
|
||||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE],
|
||||
priority_group.PRIORITY_ROUTER_CONTROLLER)
|
||||
def _set_router_provider(self, resource, event, trigger, context, router,
|
||||
router_db, **kwargs):
|
||||
def _set_router_provider(self, resource, event, trigger, payload):
|
||||
"""Associates a router with a service provider.
|
||||
|
||||
Association is done by flavor_id if it's specified, otherwise it will
|
||||
fallback to determining which loaded driver supports the ha/distributed
|
||||
attributes associated with the router.
|
||||
"""
|
||||
context = payload.context
|
||||
router = payload.latest_state
|
||||
router_db = payload.metadata['router_db']
|
||||
router_id = payload.resource_id
|
||||
if _flavor_specified(router):
|
||||
router_db.flavor_id = router['flavor_id']
|
||||
drv = self._get_provider_for_create(context, router)
|
||||
self._stm.add_resource_association(context, plugin_constants.L3,
|
||||
drv.name, router['id'])
|
||||
drv.name, router_id)
|
||||
registry.publish(
|
||||
resources.ROUTER_CONTROLLER, events.PRECOMMIT_ADD_ASSOCIATION,
|
||||
trigger, payload=events.DBEventPayload(
|
||||
|
@ -97,9 +101,10 @@ class DriverController(object):
|
|||
|
||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_DELETE],
|
||||
priority_group.PRIORITY_ROUTER_CONTROLLER)
|
||||
def _clear_router_provider(self, resource, event, trigger, context,
|
||||
router_id, **kwargs):
|
||||
def _clear_router_provider(self, resource, event, trigger, payload):
|
||||
"""Remove the association between a router and a service provider."""
|
||||
context = payload.context
|
||||
router_id = payload.resource_id
|
||||
drv = self.get_provider_for_router(context, router_id)
|
||||
registry.publish(
|
||||
resources.ROUTER_CONTROLLER, events.PRECOMMIT_DELETE_ASSOCIATIONS,
|
||||
|
|
|
@ -151,8 +151,11 @@ class OVNL3RouterPlugin(service_base.ServicePluginBase,
|
|||
return ("L3 Router Service Plugin for basic L3 forwarding"
|
||||
" using OVN")
|
||||
|
||||
def create_router_precommit(self, resource, event, trigger, context,
|
||||
router, router_id, router_db):
|
||||
def create_router_precommit(self, resource, event, trigger, payload):
|
||||
context = payload.context
|
||||
router_id = payload.resource_id
|
||||
router_db = payload.metadata['router_db']
|
||||
|
||||
db_rev.create_initial_revision(
|
||||
context, router_id, ovn_const.TYPE_ROUTERS,
|
||||
std_attr_id=router_db.standard_attr.id)
|
||||
|
|
|
@ -65,9 +65,9 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
check.assert_called_once_with(router.router_id, None)
|
||||
|
||||
expected_calls = [
|
||||
mock.call('router', 'before_create', self.agent, router=router),
|
||||
mock.call('router', 'before_create', self.agent, payload=mock.ANY),
|
||||
mock.call('router', 'after_create', self.agent, router=router),
|
||||
mock.call('router', 'before_update', self.agent, router=router),
|
||||
mock.call('router', 'before_update', self.agent, payload=mock.ANY),
|
||||
mock.call('router', 'after_update', self.agent, router=router),
|
||||
mock.call('router', 'before_delete', self.agent, payload=mock.ANY),
|
||||
mock.call('router', 'after_delete', self.agent, router=router)]
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
|
||||
from unittest import mock
|
||||
|
||||
from neutron_lib.callbacks import events
|
||||
|
||||
from neutron.agent.l3 import dvr_edge_router
|
||||
from neutron.agent.l3 import dvr_local_router
|
||||
from neutron.agent.l3 import legacy_router
|
||||
|
@ -36,7 +38,9 @@ class TestPrefixDelegation(tests_base.DietTestCase):
|
|||
|
||||
def _test_add_update_pd(self, l3_agent, router, ns_name):
|
||||
# add entry
|
||||
pd.add_router(None, None, l3_agent, router=router)
|
||||
pd.add_router(None, None, l3_agent,
|
||||
payload=events.DBEventPayload(
|
||||
mock.ANY, states=(router,)))
|
||||
pd_router = l3_agent.pd.routers.get(router.router_id)
|
||||
self.assertEqual(ns_name, pd_router.get('ns_name'))
|
||||
|
||||
|
|
|
@ -1235,12 +1235,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|||
def _test__validate_router_migration_notify_advanced_services(self):
|
||||
router = {'name': 'foo_router', 'admin_state_up': False}
|
||||
router_db = self._create_router(router)
|
||||
with mock.patch.object(l3_dvr_db.registry, 'notify') as mock_notify:
|
||||
with mock.patch.object(l3_dvr_db.registry, 'publish') as mock_publish:
|
||||
self.mixin._validate_router_migration(
|
||||
self.ctx, router_db, {'distributed': True})
|
||||
kwargs = {'context': self.ctx, 'router': router_db}
|
||||
mock_notify.assert_called_once_with(
|
||||
'router', 'before_update', self.mixin, **kwargs)
|
||||
mock_publish.assert_called_once_with(
|
||||
'router', 'before_update', self.mixin,
|
||||
payload=mock.ANY)
|
||||
|
||||
payload = mock_publish.call_args_list[0][1]['payload']
|
||||
self.assertEqual(self.ctx, payload.context)
|
||||
self.assertEqual(router_db, payload.latest_state)
|
||||
|
||||
def _assert_mock_called_with_router(self, mock_fn, router_id):
|
||||
router = mock_fn.call_args[1].get('router_db')
|
||||
|
|
|
@ -4044,7 +4044,8 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
|
|||
events.AFTER_DELETE)
|
||||
|
||||
def test_router_create_precommit_event(self):
|
||||
nset = lambda *a, **k: setattr(k['router_db'], 'name', 'hello')
|
||||
nset = lambda r, e, t, payload: \
|
||||
setattr(payload.metadata['router_db'], 'name', 'hello')
|
||||
registry.subscribe(nset, resources.ROUTER, events.PRECOMMIT_CREATE)
|
||||
with self.router() as r:
|
||||
self.assertEqual('hello', r['router']['name'])
|
||||
|
@ -4089,7 +4090,8 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
|
|||
|
||||
def test_router_delete_precommit_event(self):
|
||||
deleted = []
|
||||
auditor = lambda *a, **k: deleted.append(k['router_id'])
|
||||
auditor = lambda r, e, t, payload: \
|
||||
deleted.append(payload.resource_id)
|
||||
registry.subscribe(auditor, resources.ROUTER, events.PRECOMMIT_DELETE)
|
||||
with self.router() as r:
|
||||
self._delete('routers', r['router']['id'])
|
||||
|
|
|
@ -59,7 +59,11 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
router_id = uuidutils.generate_uuid()
|
||||
router = dict(id=router_id, flavor_id=flavor_id)
|
||||
self.dc._set_router_provider('router', 'PRECOMMIT_CREATE', self,
|
||||
self.ctx, router, router_db)
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=router_id,
|
||||
metadata={'router_db': router_db},
|
||||
states=(router,)))
|
||||
self.assertTrue(self.dc.uses_scheduler(self.ctx, router_id))
|
||||
self.dc.drivers['dvrha'].use_integrated_agent_scheduler = False
|
||||
self.assertFalse(self.dc.uses_scheduler(self.ctx, router_id))
|
||||
|
@ -72,7 +76,11 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
r2 = uuidutils.generate_uuid()
|
||||
router = dict(id=r1, flavor_id=flavor_id)
|
||||
self.dc._set_router_provider('router', 'PRECOMMIT_CREATE', self,
|
||||
self.ctx, router, router_db)
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=r1,
|
||||
metadata={'router_db': router_db},
|
||||
states=(router,)))
|
||||
self.assertTrue(self.dc.drivers['dvrha'].owns_router(self.ctx, r1))
|
||||
self.assertFalse(self.dc.drivers['dvr'].owns_router(self.ctx, r1))
|
||||
self.assertFalse(self.dc.drivers['dvr'].owns_router(self.ctx, r2))
|
||||
|
@ -86,7 +94,11 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
router_id = uuidutils.generate_uuid()
|
||||
router = dict(id=router_id, flavor_id=flavor_id)
|
||||
self.dc._set_router_provider('router', 'PRECOMMIT_CREATE', self,
|
||||
self.ctx, router, router_db)
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=router_id,
|
||||
metadata={'router_db': router_db},
|
||||
states=(router,)))
|
||||
mock_cb.assert_called_with(resources.ROUTER_CONTROLLER,
|
||||
events.PRECOMMIT_ADD_ASSOCIATION, mock.ANY,
|
||||
payload=mock.ANY)
|
||||
|
@ -190,7 +202,12 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
]
|
||||
for driver, body in cases:
|
||||
self.dc._set_router_provider('router', 'PRECOMMIT_CREATE', self,
|
||||
self.ctx, body, mock.Mock())
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=body['id'],
|
||||
metadata={
|
||||
'router_db': mock.Mock()},
|
||||
states=(body,)))
|
||||
mock_cb.assert_called_with(
|
||||
resources.ROUTER_CONTROLLER,
|
||||
events.PRECOMMIT_ADD_ASSOCIATION, mock.ANY,
|
||||
|
@ -206,7 +223,11 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
router_id1 = uuidutils.generate_uuid()
|
||||
body = dict(id=router_id1, distributed=True, ha=True)
|
||||
self.dc._set_router_provider('router', 'PRECOMMIT_CREATE', self,
|
||||
self.ctx, body, mock.Mock())
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=router_id1,
|
||||
metadata={'router_db': mock.Mock()},
|
||||
states=(body,)))
|
||||
mock_cb.assert_called_with(resources.ROUTER_CONTROLLER,
|
||||
events.PRECOMMIT_ADD_ASSOCIATION, mock.ANY,
|
||||
payload=mock.ANY)
|
||||
|
@ -219,7 +240,9 @@ class TestDriverController(testlib_api.SqlTestCase):
|
|||
self.dc.get_provider_for_router(self.ctx,
|
||||
body['id']))
|
||||
self.dc._clear_router_provider('router', 'PRECOMMIT_DELETE', self,
|
||||
self.ctx, body['id'])
|
||||
payload=events.DBEventPayload(
|
||||
self.ctx,
|
||||
resource_id=body['id']))
|
||||
mock_cb.assert_called_with(resources.ROUTER_CONTROLLER,
|
||||
events.PRECOMMIT_DELETE_ASSOCIATIONS, mock.ANY,
|
||||
payload=mock.ANY)
|
||||
|
|
Loading…
Reference in New Issue