use callback payloads for PRECOMMIT_UPDATE events

This patch switches callbacks over to the payload object style events
[1] for PRECOMMIT_UPDATE based notifications. To do so a DBEventPayload
object is used with the publish() method to pass along the related data.
In addition a few UTs are updated to work with the changes. Finally
a few shims are put into place to allow PRECOMMIT_UPDATE based events to
use payloads while still supporting the existing kwarg style events.

NeutronLibImpact

[1] https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html#event-payloads

Change-Id: Ie6d27df01cd7b87894efc80946d41eb1ebe25bef
This commit is contained in:
Boden R 2017-12-18 13:53:27 -07:00
parent f33fc5fcb3
commit 537bfb9a1c
16 changed files with 192 additions and 124 deletions

View File

@ -265,10 +265,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
old_router = self._make_router_dict(router_db)
if data:
router_db.update(data)
registry.notify(resources.ROUTER, events.PRECOMMIT_UPDATE,
self, context=context, router_id=router_id,
router=data, router_db=router_db,
old_router=old_router)
registry.publish(resources.ROUTER, events.PRECOMMIT_UPDATE, self,
payload=events.DBEventPayload(
context, request_body=data,
states=(old_router,), resource_id=router_id,
desired_state=router_db))
return router_db
@db_api.retry_if_session_inactive()

View File

@ -106,42 +106,46 @@ class DVRResourceOperationHandler(object):
return True
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
def _handle_distributed_migration(self, resource, event, trigger, context,
router_id, router, router_db, old_router,
**kwargs):
def _handle_distributed_migration(self, resource, event,
trigger, payload=None):
"""Event handler for router update migration to distributed."""
if not self._validate_router_migration(context, router_db, router):
if not self._validate_router_migration(
payload.context, payload.desired_state,
payload.request_body):
return
migrating_to_distributed = (
not router_db.extra_attributes.distributed and
router.get('distributed') is True)
not payload.desired_state.extra_attributes.distributed and
payload.request_body.get('distributed') is True)
if migrating_to_distributed:
if old_router['ha']:
if payload.states[0]['ha']:
old_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
else:
old_owner = const.DEVICE_OWNER_ROUTER_INTF
self.l3plugin._migrate_router_ports(
context, router_db,
payload.context, payload.desired_state,
old_owner=old_owner,
new_owner=const.DEVICE_OWNER_DVR_INTERFACE)
else:
if router.get('ha'):
if payload.request_body.get('ha'):
new_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
else:
new_owner = const.DEVICE_OWNER_ROUTER_INTF
self.l3plugin._migrate_router_ports(
context, router_db,
payload.context, payload.desired_state,
old_owner=const.DEVICE_OWNER_DVR_INTERFACE,
new_owner=new_owner)
cur_agents = self.l3plugin.list_l3_agents_hosting_router(
context, router_db['id'])['agents']
payload.context, payload.resource_id)['agents']
for agent in cur_agents:
self.l3plugin._unbind_router(context, router_db['id'], agent['id'])
self.l3plugin._unbind_router(
payload.context, payload.resource_id,
agent['id'])
self.l3plugin.set_extra_attr_value(
context, router_db, 'distributed', migrating_to_distributed)
payload.context, payload.desired_state,
'distributed', migrating_to_distributed)
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
def _delete_snat_interfaces_after_change(self, resource, event, trigger,

View File

@ -397,20 +397,18 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
{'status': constants.ERROR})['status']
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
def _validate_migration(self, resource, event, trigger, context,
router_id, router, router_db, old_router,
**kwargs):
def _validate_migration(self, resource, event, trigger, payload=None):
"""Event handler on precommit update to validate migration."""
original_ha_state = old_router['ha']
requested_ha_state = router.get('ha')
original_ha_state = payload.states[0]['ha']
requested_ha_state = payload.request_body.get('ha')
ha_changed = (requested_ha_state is not None and
requested_ha_state != original_ha_state)
if not ha_changed:
return
if router_db.admin_state_up:
if payload.desired_state.admin_state_up:
msg = _('Cannot change HA attribute of active routers. Please '
'set router admin_state_up to False prior to upgrade')
raise n_exc.BadRequest(resource='router', msg=msg)
@ -418,29 +416,32 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
if requested_ha_state:
# This will throw HANotEnoughAvailableAgents if there aren't
# enough l3 agents to handle this router.
self.get_number_of_agents_for_scheduling(context)
self.get_number_of_agents_for_scheduling(payload.context)
else:
ha_network = self.get_ha_network(context,
router_db.tenant_id)
ha_network = self.get_ha_network(payload.context,
payload.desired_state.tenant_id)
self._delete_vr_id_allocation(
context, ha_network, router_db.extra_attributes.ha_vr_id)
router_db.extra_attributes.ha_vr_id = None
if router.get('distributed') or old_router['distributed']:
self.set_extra_attr_value(context, router_db,
payload.context, ha_network,
payload.desired_state.extra_attributes.ha_vr_id)
payload.desired_state.extra_attributes.ha_vr_id = None
if (payload.request_body.get('distributed') or
payload.states[0]['distributed']):
self.set_extra_attr_value(payload.context, payload.desired_state,
'ha', requested_ha_state)
return
if requested_ha_state:
self._migrate_router_ports(
context, router_db,
payload.context, payload.desired_state,
old_owner=constants.DEVICE_OWNER_ROUTER_INTF,
new_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT)
else:
self._migrate_router_ports(
context, router_db,
payload.context, payload.desired_state,
old_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT,
new_owner=constants.DEVICE_OWNER_ROUTER_INTF)
self.set_extra_attr_value(context, router_db, 'ha', requested_ha_state)
self.set_extra_attr_value(
payload.context, payload.desired_state, 'ha', requested_ha_state)
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
def _reconfigure_ha_resources(self, resource, event, trigger, context,

View File

@ -56,7 +56,11 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
# from being processed. This is a hook point for backend's validation;
# we raise to propagate the reason for the failure.
try:
registry.notify(res, event, self, **kwargs)
if 'payload' in kwargs:
# TODO(boden): remove shim once all callbacks use payloads
registry.publish(res, event, self, payload=kwargs['payload'])
else:
registry.notify(res, event, self, **kwargs)
except exceptions.CallbackFailure as e:
if exc_cls:
reason = (_('cannot perform %(event)s due to %(reason)s') %
@ -261,8 +265,11 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
self._registry_notify(
resources.SECURITY_GROUP,
events.PRECOMMIT_UPDATE,
exc_cls=ext_sg.SecurityGroupConflict, **kwargs)
exc_cls=ext_sg.SecurityGroupConflict,
payload=events.DBEventPayload(
context, request_body=s,
states=(kwargs['original_security_group'],),
resource_id=id, desired_state=sg_dict))
registry.notify(resources.SECURITY_GROUP, events.AFTER_UPDATE, self,
**kwargs)
return sg_dict

View File

@ -899,11 +899,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.type_manager.extend_network_dict_provider(
context, updated_network)
kwargs = {'context': context, 'network': updated_network,
'original_network': original_network,
'request': net_data}
registry.notify(
resources.NETWORK, events.PRECOMMIT_UPDATE, self, **kwargs)
registry.publish(resources.NETWORK, events.PRECOMMIT_UPDATE, self,
payload=events.DBEventPayload(
context, request_body=net_data,
states=(original_network,),
resource_id=id,
desired_state=updated_network))
# TODO(QoS): Move out to the extension framework somehow.
need_network_update_notify |= (
@ -1347,13 +1348,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
need_port_update_notify |= self._process_port_binding(
mech_context, attrs)
kwargs = {
'context': context,
'port': updated_port,
'original_port': original_port,
}
registry.notify(
resources.PORT, events.PRECOMMIT_UPDATE, self, **kwargs)
registry.publish(
resources.PORT, events.PRECOMMIT_UPDATE, self,
payload=events.DBEventPayload(
context, request_body=attrs, states=(original_port,),
resource_id=id, desired_state=updated_port))
# For DVR router interface ports we need to retrieve the
# DVRPortbinding context instead of the normal port context.

View File

@ -42,33 +42,50 @@ LOG = logging.getLogger(__name__)
CHECK_REQUIREMENTS = 'dry-run'
@db_api.retry_if_session_inactive()
def _ensure_external_network_default_value_callback(
resource, event, trigger, context, request, network, **kwargs):
resource, event, trigger, **kwargs):
"""Ensure the is_default db field matches the create/update request."""
is_default = request.get(api_const.IS_DEFAULT)
if is_default is None:
return
if is_default:
# ensure there is only one default external network at any given time
pager = base_obj.Pager(limit=1)
objs = net_obj.ExternalNetwork.get_objects(context,
_pager=pager, is_default=True)
if objs:
if objs[0] and network['id'] != objs[0].network_id:
raise exceptions.DefaultExternalNetworkExists(
net_id=objs[0].network_id)
orig = kwargs.get('original_network')
if orig and orig.get(api_const.IS_DEFAULT) == is_default:
return
network[api_const.IS_DEFAULT] = is_default
# Reflect the status of the is_default on the create/update request
obj = net_obj.ExternalNetwork.get_object(context,
network_id=network['id'])
if obj:
obj.is_default = is_default
obj.update()
# TODO(boden): remove shim once all callbacks use payloads
if 'payload' in kwargs:
_request = kwargs['payload'].request_body
_context = kwargs['payload'].context
_network = kwargs['payload'].desired_state
_orig = kwargs['payload'].states[0]
else:
_request = kwargs['request']
_context = kwargs['context']
_network = kwargs['network']
_orig = kwargs.get('original_network')
@db_api.retry_if_session_inactive()
def _do_ensure_external_network_default_value_callback(
context, request, orig, network):
is_default = request.get(api_const.IS_DEFAULT)
if is_default is None:
return
if is_default:
# ensure only one default external network at any given time
pager = base_obj.Pager(limit=1)
objs = net_obj.ExternalNetwork.get_objects(context,
_pager=pager, is_default=True)
if objs:
if objs[0] and network['id'] != objs[0].network_id:
raise exceptions.DefaultExternalNetworkExists(
net_id=objs[0].network_id)
if orig and orig.get(api_const.IS_DEFAULT) == is_default:
return
network[api_const.IS_DEFAULT] = is_default
# Reflect the status of the is_default on the create/update request
obj = net_obj.ExternalNetwork.get_object(context,
network_id=network['id'])
if obj:
obj.is_default = is_default
obj.update()
_do_ensure_external_network_default_value_callback(
_context, _request, _orig, _network)
@resource_extend.has_resource_extenders

View File

@ -92,19 +92,19 @@ class DriverController(object):
self._stm.del_resource_associations(context, [router_id])
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
def _update_router_provider(self, resource, event, trigger, context,
router_id, router, old_router, router_db,
**kwargs):
def _update_router_provider(self, resource, event, trigger, payload=None):
"""Handle transition between providers.
The provider can currently be changed only by the caller updating
'ha' and/or 'distributed' attributes. If we allow updates of flavor_id
directly in the future those requests will also land here.
"""
drv = self.get_provider_for_router(context, router_id)
drv = self.get_provider_for_router(payload.context,
payload.resource_id)
new_drv = None
if _flavor_specified(router):
if router['flavor_id'] != old_router['flavor_id']:
if _flavor_specified(payload.request_body):
if (payload.request_body['flavor_id'] !=
payload.states[0]['flavor_id']):
# TODO(kevinbenton): this is currently disallowed by the API
# so we shouldn't hit it but this is a placeholder to add
# support later.
@ -113,7 +113,7 @@ class DriverController(object):
# the following is to support updating the 'ha' and 'distributed'
# attributes via the API.
try:
_ensure_driver_supports_request(drv, router)
_ensure_driver_supports_request(drv, payload.request_body)
except lib_exc.InvalidInput:
# the current driver does not support this request, we need to
# migrate to a new provider. populate the distributed and ha
@ -123,25 +123,29 @@ class DriverController(object):
# we bail because changing the provider without changing
# the flavor will make things inconsistent. We can probably
# update the flavor automatically in the future.
if old_router['flavor_id']:
if payload.states[0]['flavor_id']:
raise lib_exc.InvalidInput(error_message=_(
"Changing the 'ha' and 'distributed' attributes on a "
"router associated with a flavor is not supported"))
if 'distributed' not in router:
router['distributed'] = old_router['distributed']
if 'ha' not in router:
router['ha'] = old_router['distributed']
new_drv = self._attrs_to_driver(router)
if 'distributed' not in payload.request_body:
payload.request_body['distributed'] = (payload.states[0]
['distributed'])
if 'ha' not in payload.request_body:
payload.request_body['ha'] = payload.states[0]['distributed']
new_drv = self._attrs_to_driver(payload.request_body)
if new_drv:
LOG.debug("Router %(id)s migrating from %(old)s provider to "
"%(new)s provider.", {'id': router_id, 'old': drv,
"%(new)s provider.", {'id': payload.resource_id,
'old': drv,
'new': new_drv})
_ensure_driver_supports_request(new_drv, router)
_ensure_driver_supports_request(new_drv, payload.request_body)
# TODO(kevinbenton): notify old driver explicitly of driver change
with context.session.begin(subtransactions=True):
self._stm.del_resource_associations(context, [router_id])
with payload.context.session.begin(subtransactions=True):
self._stm.del_resource_associations(
payload.context, [payload.resource_id])
self._stm.add_resource_association(
context, plugin_constants.L3, new_drv.name, router_id)
payload.context, plugin_constants.L3,
new_drv.name, payload.resource_id)
def get_provider_for_router(self, context, router_id):
"""Return the provider driver handle for a router id."""

View File

@ -98,28 +98,27 @@ class QoSPlugin(qos.QoSPluginBase):
self.validate_policy_for_port(policy, port)
def _validate_update_port_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
original_policy_id = kwargs['original_port'].get(
payload=None):
context = payload.context
original_policy_id = payload.states[0].get(
qos_consts.QOS_POLICY_ID)
policy_id = kwargs['port'].get(qos_consts.QOS_POLICY_ID)
policy_id = payload.desired_state.get(qos_consts.QOS_POLICY_ID)
if policy_id is None or policy_id == original_policy_id:
return
updated_port = ports_object.Port.get_object(
context, id=kwargs['port']['id'])
context, id=payload.desired_state['id'])
policy = policy_object.QosPolicy.get_object(
context.elevated(), id=policy_id)
self.validate_policy_for_port(policy, updated_port)
def _validate_update_network_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
original_network = kwargs['original_network']
updated_network = kwargs['network']
payload=None):
context = payload.context
original_network = payload.states[0]
updated_network = payload.desired_state
original_policy_id = original_network.get(qos_consts.QOS_POLICY_ID)
policy_id = updated_network.get(qos_consts.QOS_POLICY_ID)

View File

@ -14,6 +14,7 @@
# limitations under the License.
# TODO(boden): remove this once moved over to neutron-lib payloads
class TrunkPayload(object):
"""Payload for trunk-related callback registry notifications."""

View File

@ -245,13 +245,16 @@ class TrunkPlugin(service_base.ServicePluginBase,
# these are DB properties only.
trunk_obj.update_fields(trunk_data, reset_changes=True)
trunk_obj.update()
payload = callbacks.TrunkPayload(context, trunk_id,
original_trunk=original_trunk,
current_trunk=trunk_obj)
registry.notify(constants.TRUNK, events.PRECOMMIT_UPDATE, self,
payload=payload)
payload = events.DBEventPayload(
context, resource_id=trunk_id, states=(original_trunk,),
desired_state=trunk_obj, request_body=trunk_data)
registry.publish(constants.TRUNK, events.PRECOMMIT_UPDATE, self,
payload=payload)
registry.notify(constants.TRUNK, events.AFTER_UPDATE, self,
payload=payload)
payload=callbacks.TrunkPayload(
context, trunk_id,
original_trunk=original_trunk,
current_trunk=trunk_obj))
return trunk_obj
def delete_trunk(self, context, trunk_id):

View File

@ -331,16 +331,19 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
original_sg_dict = self.mixin.create_security_group(self.ctx,
FAKE_SECGROUP)
sg_id = original_sg_dict['id']
with mock.patch.object(registry, "notify") as mock_notify:
with mock.patch.object(registry, "publish") as mock_notify:
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
fake_secgroup['security_group']['name'] = 'updated_fake'
sg_dict = self.mixin.update_security_group(
self.ctx, sg_id, fake_secgroup)
mock_notify.assert_has_calls([mock.call('security_group',
'precommit_update', mock.ANY, context=mock.ANY,
original_security_group=original_sg_dict,
security_group=sg_dict,
security_group_id=sg_id)])
mock_notify.assert_has_calls(
[mock.call('security_group', 'precommit_update', mock.ANY,
payload=mock.ANY)])
payload = mock_notify.call_args[1]['payload']
self.assertEqual(original_sg_dict, payload.states[0])
self.assertEqual(sg_id, payload.resource_id)
self.assertEqual(sg_dict, payload.desired_state)
def test_security_group_precommit_and_after_delete_event(self):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)

View File

@ -3675,9 +3675,12 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
self.assertFalse(body['routers'])
def test_router_update_precommit_event(self):
nset = lambda *a, **k: setattr(k['router_db'], 'name',
k['old_router']['name'] + '_ha!')
registry.subscribe(nset, resources.ROUTER, events.PRECOMMIT_UPDATE)
def _nset(r, v, s, payload=None):
setattr(payload.desired_state, 'name',
payload.states[0]['name'] + '_ha!')
registry.subscribe(_nset, resources.ROUTER, events.PRECOMMIT_UPDATE)
with self.router(name='original') as r:
update = self._update('routers', r['router']['id'],
{'router': {'name': 'hi'}})

View File

@ -245,8 +245,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.deserialize(self.fmt, req.get_response(self.api))
precommit_update.assert_called_once_with(
resources.NETWORK, events.PRECOMMIT_UPDATE, mock.ANY,
context=mock.ANY, network=mock.ANY, original_network=mock.ANY,
request=mock.ANY)
payload=mock.ANY)
self.assertEqual(
'updated',
precommit_update.call_args[1]['payload'].desired_state['name'])
def test_network_after_update_callback(self):
after_update = mock.Mock()

View File

@ -13,6 +13,7 @@
# under the License.
import mock
from neutron_lib.callbacks import events
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
@ -97,9 +98,10 @@ class TestDriverController(testlib_api.SqlTestCase):
self.assertRaises(
lib_exc.InvalidInput,
test_dc._update_router_provider,
None, None, None, None,
None, {'name': 'testname'},
{'flavor_id': 'old_fid'}, None)
None, None, None,
payload=events.DBEventPayload(
None, request_body={'name': 'testname'},
states=({'flavor_id': 'old_fid'},)))
def test__set_router_provider_attr_lookups(self):
# ensure correct drivers are looked up based on attrs

View File

@ -13,6 +13,7 @@
import copy
import mock
from neutron_lib.callbacks import events
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.plugins import constants as plugins_constants
@ -193,7 +194,6 @@ class TestQosPlugin(base.BaseQosTestCase):
original_policy_id=None):
port_id = uuidutils.generate_uuid()
kwargs = {
"context": self.ctxt,
"port": {
"id": port_id,
qos_consts.QOS_POLICY_ID: policy_id
@ -218,7 +218,10 @@ class TestQosPlugin(base.BaseQosTestCase):
self.ctxt, "elevated", return_value=admin_ctxt
):
self.qos_plugin._validate_update_port_callback(
"PORT", "precommit_update", "test_plugin", **kwargs)
"PORT", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctxt, desired_state=kwargs['port'],
states=(kwargs['original_port'],)))
if policy_id is None or policy_id == original_policy_id:
get_port.assert_not_called()
get_policy.assert_not_called()
@ -276,7 +279,10 @@ class TestQosPlugin(base.BaseQosTestCase):
self.ctxt, "elevated", return_value=admin_ctxt
):
self.qos_plugin._validate_update_network_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctxt, desired_state=kwargs['network'],
states=(kwargs['original_network'],)))
if policy_id is None or policy_id == original_policy_id:
get_policy.assert_not_called()
get_ports.assert_not_called()

View File

@ -136,7 +136,23 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self._test_trunk_update_notify(events.AFTER_UPDATE)
def test_trunk_update_notify_precommit_update(self):
self._test_trunk_update_notify(events.PRECOMMIT_UPDATE)
# TODO(boden): refactor back into _test_trunk_update_notify
# once all code uses neutron-lib payloads
with self.port() as parent_port:
callback = register_mock_callback(
constants.TRUNK, events.PRECOMMIT_UPDATE)
trunk = self._create_test_trunk(parent_port)
orig_trunk_obj = self._get_trunk_obj(trunk['id'])
trunk_req = {'trunk': {'name': 'foo'}}
self.trunk_plugin.update_trunk(self.context, trunk['id'],
trunk_req)
trunk_obj = self._get_trunk_obj(trunk['id'])
callback.assert_called_once_with(
constants.TRUNK, events.PRECOMMIT_UPDATE,
self.trunk_plugin, payload=mock.ANY)
call_payload = callback.call_args[1]['payload']
self.assertEqual(orig_trunk_obj, call_payload.states[0])
self.assertEqual(trunk_obj, call_payload.desired_state)
def _test_trunk_delete_notify(self, event):
with self.port() as parent_port: