Merge "use callback payloads for PRECOMMIT_UPDATE events"
This commit is contained in:
commit
4da778fec9
|
@ -265,10 +265,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
||||||
old_router = self._make_router_dict(router_db)
|
old_router = self._make_router_dict(router_db)
|
||||||
if data:
|
if data:
|
||||||
router_db.update(data)
|
router_db.update(data)
|
||||||
registry.notify(resources.ROUTER, events.PRECOMMIT_UPDATE,
|
registry.publish(resources.ROUTER, events.PRECOMMIT_UPDATE, self,
|
||||||
self, context=context, router_id=router_id,
|
payload=events.DBEventPayload(
|
||||||
router=data, router_db=router_db,
|
context, request_body=data,
|
||||||
old_router=old_router)
|
states=(old_router,), resource_id=router_id,
|
||||||
|
desired_state=router_db))
|
||||||
return router_db
|
return router_db
|
||||||
|
|
||||||
@db_api.retry_if_session_inactive()
|
@db_api.retry_if_session_inactive()
|
||||||
|
|
|
@ -106,42 +106,46 @@ class DVRResourceOperationHandler(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
||||||
def _handle_distributed_migration(self, resource, event, trigger, context,
|
def _handle_distributed_migration(self, resource, event,
|
||||||
router_id, router, router_db, old_router,
|
trigger, payload=None):
|
||||||
**kwargs):
|
|
||||||
"""Event handler for router update migration to distributed."""
|
"""Event handler for router update migration to distributed."""
|
||||||
if not self._validate_router_migration(context, router_db, router):
|
if not self._validate_router_migration(
|
||||||
|
payload.context, payload.desired_state,
|
||||||
|
payload.request_body):
|
||||||
return
|
return
|
||||||
|
|
||||||
migrating_to_distributed = (
|
migrating_to_distributed = (
|
||||||
not router_db.extra_attributes.distributed and
|
not payload.desired_state.extra_attributes.distributed and
|
||||||
router.get('distributed') is True)
|
payload.request_body.get('distributed') is True)
|
||||||
|
|
||||||
if migrating_to_distributed:
|
if migrating_to_distributed:
|
||||||
if old_router['ha']:
|
if payload.states[0]['ha']:
|
||||||
old_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
|
old_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
|
||||||
else:
|
else:
|
||||||
old_owner = const.DEVICE_OWNER_ROUTER_INTF
|
old_owner = const.DEVICE_OWNER_ROUTER_INTF
|
||||||
self.l3plugin._migrate_router_ports(
|
self.l3plugin._migrate_router_ports(
|
||||||
context, router_db,
|
payload.context, payload.desired_state,
|
||||||
old_owner=old_owner,
|
old_owner=old_owner,
|
||||||
new_owner=const.DEVICE_OWNER_DVR_INTERFACE)
|
new_owner=const.DEVICE_OWNER_DVR_INTERFACE)
|
||||||
else:
|
else:
|
||||||
if router.get('ha'):
|
if payload.request_body.get('ha'):
|
||||||
new_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
|
new_owner = const.DEVICE_OWNER_HA_REPLICATED_INT
|
||||||
else:
|
else:
|
||||||
new_owner = const.DEVICE_OWNER_ROUTER_INTF
|
new_owner = const.DEVICE_OWNER_ROUTER_INTF
|
||||||
self.l3plugin._migrate_router_ports(
|
self.l3plugin._migrate_router_ports(
|
||||||
context, router_db,
|
payload.context, payload.desired_state,
|
||||||
old_owner=const.DEVICE_OWNER_DVR_INTERFACE,
|
old_owner=const.DEVICE_OWNER_DVR_INTERFACE,
|
||||||
new_owner=new_owner)
|
new_owner=new_owner)
|
||||||
|
|
||||||
cur_agents = self.l3plugin.list_l3_agents_hosting_router(
|
cur_agents = self.l3plugin.list_l3_agents_hosting_router(
|
||||||
context, router_db['id'])['agents']
|
payload.context, payload.resource_id)['agents']
|
||||||
for agent in cur_agents:
|
for agent in cur_agents:
|
||||||
self.l3plugin._unbind_router(context, router_db['id'], agent['id'])
|
self.l3plugin._unbind_router(
|
||||||
|
payload.context, payload.resource_id,
|
||||||
|
agent['id'])
|
||||||
self.l3plugin.set_extra_attr_value(
|
self.l3plugin.set_extra_attr_value(
|
||||||
context, router_db, 'distributed', migrating_to_distributed)
|
payload.context, payload.desired_state,
|
||||||
|
'distributed', migrating_to_distributed)
|
||||||
|
|
||||||
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
|
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
|
||||||
def _delete_snat_interfaces_after_change(self, resource, event, trigger,
|
def _delete_snat_interfaces_after_change(self, resource, event, trigger,
|
||||||
|
|
|
@ -397,20 +397,18 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||||
{'status': constants.ERROR})['status']
|
{'status': constants.ERROR})['status']
|
||||||
|
|
||||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
||||||
def _validate_migration(self, resource, event, trigger, context,
|
def _validate_migration(self, resource, event, trigger, payload=None):
|
||||||
router_id, router, router_db, old_router,
|
|
||||||
**kwargs):
|
|
||||||
"""Event handler on precommit update to validate migration."""
|
"""Event handler on precommit update to validate migration."""
|
||||||
|
|
||||||
original_ha_state = old_router['ha']
|
original_ha_state = payload.states[0]['ha']
|
||||||
requested_ha_state = router.get('ha')
|
requested_ha_state = payload.request_body.get('ha')
|
||||||
|
|
||||||
ha_changed = (requested_ha_state is not None and
|
ha_changed = (requested_ha_state is not None and
|
||||||
requested_ha_state != original_ha_state)
|
requested_ha_state != original_ha_state)
|
||||||
if not ha_changed:
|
if not ha_changed:
|
||||||
return
|
return
|
||||||
|
|
||||||
if router_db.admin_state_up:
|
if payload.desired_state.admin_state_up:
|
||||||
msg = _('Cannot change HA attribute of active routers. Please '
|
msg = _('Cannot change HA attribute of active routers. Please '
|
||||||
'set router admin_state_up to False prior to upgrade')
|
'set router admin_state_up to False prior to upgrade')
|
||||||
raise n_exc.BadRequest(resource='router', msg=msg)
|
raise n_exc.BadRequest(resource='router', msg=msg)
|
||||||
|
@ -418,29 +416,32 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||||
if requested_ha_state:
|
if requested_ha_state:
|
||||||
# This will throw HANotEnoughAvailableAgents if there aren't
|
# This will throw HANotEnoughAvailableAgents if there aren't
|
||||||
# enough l3 agents to handle this router.
|
# enough l3 agents to handle this router.
|
||||||
self.get_number_of_agents_for_scheduling(context)
|
self.get_number_of_agents_for_scheduling(payload.context)
|
||||||
else:
|
else:
|
||||||
|
|
||||||
ha_network = self.get_ha_network(context,
|
ha_network = self.get_ha_network(payload.context,
|
||||||
router_db.tenant_id)
|
payload.desired_state.tenant_id)
|
||||||
self._delete_vr_id_allocation(
|
self._delete_vr_id_allocation(
|
||||||
context, ha_network, router_db.extra_attributes.ha_vr_id)
|
payload.context, ha_network,
|
||||||
router_db.extra_attributes.ha_vr_id = None
|
payload.desired_state.extra_attributes.ha_vr_id)
|
||||||
if router.get('distributed') or old_router['distributed']:
|
payload.desired_state.extra_attributes.ha_vr_id = None
|
||||||
self.set_extra_attr_value(context, router_db,
|
if (payload.request_body.get('distributed') or
|
||||||
|
payload.states[0]['distributed']):
|
||||||
|
self.set_extra_attr_value(payload.context, payload.desired_state,
|
||||||
'ha', requested_ha_state)
|
'ha', requested_ha_state)
|
||||||
return
|
return
|
||||||
if requested_ha_state:
|
if requested_ha_state:
|
||||||
self._migrate_router_ports(
|
self._migrate_router_ports(
|
||||||
context, router_db,
|
payload.context, payload.desired_state,
|
||||||
old_owner=constants.DEVICE_OWNER_ROUTER_INTF,
|
old_owner=constants.DEVICE_OWNER_ROUTER_INTF,
|
||||||
new_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT)
|
new_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT)
|
||||||
else:
|
else:
|
||||||
self._migrate_router_ports(
|
self._migrate_router_ports(
|
||||||
context, router_db,
|
payload.context, payload.desired_state,
|
||||||
old_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT,
|
old_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT,
|
||||||
new_owner=constants.DEVICE_OWNER_ROUTER_INTF)
|
new_owner=constants.DEVICE_OWNER_ROUTER_INTF)
|
||||||
self.set_extra_attr_value(context, router_db, 'ha', requested_ha_state)
|
self.set_extra_attr_value(
|
||||||
|
payload.context, payload.desired_state, 'ha', requested_ha_state)
|
||||||
|
|
||||||
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
|
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE])
|
||||||
def _reconfigure_ha_resources(self, resource, event, trigger, context,
|
def _reconfigure_ha_resources(self, resource, event, trigger, context,
|
||||||
|
|
|
@ -56,7 +56,11 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||||
# from being processed. This is a hook point for backend's validation;
|
# from being processed. This is a hook point for backend's validation;
|
||||||
# we raise to propagate the reason for the failure.
|
# we raise to propagate the reason for the failure.
|
||||||
try:
|
try:
|
||||||
registry.notify(res, event, self, **kwargs)
|
if 'payload' in kwargs:
|
||||||
|
# TODO(boden): remove shim once all callbacks use payloads
|
||||||
|
registry.publish(res, event, self, payload=kwargs['payload'])
|
||||||
|
else:
|
||||||
|
registry.notify(res, event, self, **kwargs)
|
||||||
except exceptions.CallbackFailure as e:
|
except exceptions.CallbackFailure as e:
|
||||||
if exc_cls:
|
if exc_cls:
|
||||||
reason = (_('cannot perform %(event)s due to %(reason)s') %
|
reason = (_('cannot perform %(event)s due to %(reason)s') %
|
||||||
|
@ -261,8 +265,11 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||||
self._registry_notify(
|
self._registry_notify(
|
||||||
resources.SECURITY_GROUP,
|
resources.SECURITY_GROUP,
|
||||||
events.PRECOMMIT_UPDATE,
|
events.PRECOMMIT_UPDATE,
|
||||||
exc_cls=ext_sg.SecurityGroupConflict, **kwargs)
|
exc_cls=ext_sg.SecurityGroupConflict,
|
||||||
|
payload=events.DBEventPayload(
|
||||||
|
context, request_body=s,
|
||||||
|
states=(kwargs['original_security_group'],),
|
||||||
|
resource_id=id, desired_state=sg_dict))
|
||||||
registry.notify(resources.SECURITY_GROUP, events.AFTER_UPDATE, self,
|
registry.notify(resources.SECURITY_GROUP, events.AFTER_UPDATE, self,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
return sg_dict
|
return sg_dict
|
||||||
|
|
|
@ -899,11 +899,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||||
self.type_manager.extend_network_dict_provider(
|
self.type_manager.extend_network_dict_provider(
|
||||||
context, updated_network)
|
context, updated_network)
|
||||||
|
|
||||||
kwargs = {'context': context, 'network': updated_network,
|
registry.publish(resources.NETWORK, events.PRECOMMIT_UPDATE, self,
|
||||||
'original_network': original_network,
|
payload=events.DBEventPayload(
|
||||||
'request': net_data}
|
context, request_body=net_data,
|
||||||
registry.notify(
|
states=(original_network,),
|
||||||
resources.NETWORK, events.PRECOMMIT_UPDATE, self, **kwargs)
|
resource_id=id,
|
||||||
|
desired_state=updated_network))
|
||||||
|
|
||||||
# TODO(QoS): Move out to the extension framework somehow.
|
# TODO(QoS): Move out to the extension framework somehow.
|
||||||
need_network_update_notify |= (
|
need_network_update_notify |= (
|
||||||
|
@ -1347,13 +1348,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||||
need_port_update_notify |= self._process_port_binding(
|
need_port_update_notify |= self._process_port_binding(
|
||||||
mech_context, attrs)
|
mech_context, attrs)
|
||||||
|
|
||||||
kwargs = {
|
registry.publish(
|
||||||
'context': context,
|
resources.PORT, events.PRECOMMIT_UPDATE, self,
|
||||||
'port': updated_port,
|
payload=events.DBEventPayload(
|
||||||
'original_port': original_port,
|
context, request_body=attrs, states=(original_port,),
|
||||||
}
|
resource_id=id, desired_state=updated_port))
|
||||||
registry.notify(
|
|
||||||
resources.PORT, events.PRECOMMIT_UPDATE, self, **kwargs)
|
|
||||||
|
|
||||||
# For DVR router interface ports we need to retrieve the
|
# For DVR router interface ports we need to retrieve the
|
||||||
# DVRPortbinding context instead of the normal port context.
|
# DVRPortbinding context instead of the normal port context.
|
||||||
|
|
|
@ -42,33 +42,50 @@ LOG = logging.getLogger(__name__)
|
||||||
CHECK_REQUIREMENTS = 'dry-run'
|
CHECK_REQUIREMENTS = 'dry-run'
|
||||||
|
|
||||||
|
|
||||||
@db_api.retry_if_session_inactive()
|
|
||||||
def _ensure_external_network_default_value_callback(
|
def _ensure_external_network_default_value_callback(
|
||||||
resource, event, trigger, context, request, network, **kwargs):
|
resource, event, trigger, **kwargs):
|
||||||
"""Ensure the is_default db field matches the create/update request."""
|
"""Ensure the is_default db field matches the create/update request."""
|
||||||
is_default = request.get(api_const.IS_DEFAULT)
|
|
||||||
if is_default is None:
|
|
||||||
return
|
|
||||||
if is_default:
|
|
||||||
# ensure there is only one default external network at any given time
|
|
||||||
pager = base_obj.Pager(limit=1)
|
|
||||||
objs = net_obj.ExternalNetwork.get_objects(context,
|
|
||||||
_pager=pager, is_default=True)
|
|
||||||
if objs:
|
|
||||||
if objs[0] and network['id'] != objs[0].network_id:
|
|
||||||
raise exceptions.DefaultExternalNetworkExists(
|
|
||||||
net_id=objs[0].network_id)
|
|
||||||
|
|
||||||
orig = kwargs.get('original_network')
|
# TODO(boden): remove shim once all callbacks use payloads
|
||||||
if orig and orig.get(api_const.IS_DEFAULT) == is_default:
|
if 'payload' in kwargs:
|
||||||
return
|
_request = kwargs['payload'].request_body
|
||||||
network[api_const.IS_DEFAULT] = is_default
|
_context = kwargs['payload'].context
|
||||||
# Reflect the status of the is_default on the create/update request
|
_network = kwargs['payload'].desired_state
|
||||||
obj = net_obj.ExternalNetwork.get_object(context,
|
_orig = kwargs['payload'].states[0]
|
||||||
network_id=network['id'])
|
else:
|
||||||
if obj:
|
_request = kwargs['request']
|
||||||
obj.is_default = is_default
|
_context = kwargs['context']
|
||||||
obj.update()
|
_network = kwargs['network']
|
||||||
|
_orig = kwargs.get('original_network')
|
||||||
|
|
||||||
|
@db_api.retry_if_session_inactive()
|
||||||
|
def _do_ensure_external_network_default_value_callback(
|
||||||
|
context, request, orig, network):
|
||||||
|
is_default = request.get(api_const.IS_DEFAULT)
|
||||||
|
if is_default is None:
|
||||||
|
return
|
||||||
|
if is_default:
|
||||||
|
# ensure only one default external network at any given time
|
||||||
|
pager = base_obj.Pager(limit=1)
|
||||||
|
objs = net_obj.ExternalNetwork.get_objects(context,
|
||||||
|
_pager=pager, is_default=True)
|
||||||
|
if objs:
|
||||||
|
if objs[0] and network['id'] != objs[0].network_id:
|
||||||
|
raise exceptions.DefaultExternalNetworkExists(
|
||||||
|
net_id=objs[0].network_id)
|
||||||
|
|
||||||
|
if orig and orig.get(api_const.IS_DEFAULT) == is_default:
|
||||||
|
return
|
||||||
|
network[api_const.IS_DEFAULT] = is_default
|
||||||
|
# Reflect the status of the is_default on the create/update request
|
||||||
|
obj = net_obj.ExternalNetwork.get_object(context,
|
||||||
|
network_id=network['id'])
|
||||||
|
if obj:
|
||||||
|
obj.is_default = is_default
|
||||||
|
obj.update()
|
||||||
|
|
||||||
|
_do_ensure_external_network_default_value_callback(
|
||||||
|
_context, _request, _orig, _network)
|
||||||
|
|
||||||
|
|
||||||
@resource_extend.has_resource_extenders
|
@resource_extend.has_resource_extenders
|
||||||
|
|
|
@ -92,19 +92,19 @@ class DriverController(object):
|
||||||
self._stm.del_resource_associations(context, [router_id])
|
self._stm.del_resource_associations(context, [router_id])
|
||||||
|
|
||||||
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
@registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE])
|
||||||
def _update_router_provider(self, resource, event, trigger, context,
|
def _update_router_provider(self, resource, event, trigger, payload=None):
|
||||||
router_id, router, old_router, router_db,
|
|
||||||
**kwargs):
|
|
||||||
"""Handle transition between providers.
|
"""Handle transition between providers.
|
||||||
|
|
||||||
The provider can currently be changed only by the caller updating
|
The provider can currently be changed only by the caller updating
|
||||||
'ha' and/or 'distributed' attributes. If we allow updates of flavor_id
|
'ha' and/or 'distributed' attributes. If we allow updates of flavor_id
|
||||||
directly in the future those requests will also land here.
|
directly in the future those requests will also land here.
|
||||||
"""
|
"""
|
||||||
drv = self.get_provider_for_router(context, router_id)
|
drv = self.get_provider_for_router(payload.context,
|
||||||
|
payload.resource_id)
|
||||||
new_drv = None
|
new_drv = None
|
||||||
if _flavor_specified(router):
|
if _flavor_specified(payload.request_body):
|
||||||
if router['flavor_id'] != old_router['flavor_id']:
|
if (payload.request_body['flavor_id'] !=
|
||||||
|
payload.states[0]['flavor_id']):
|
||||||
# TODO(kevinbenton): this is currently disallowed by the API
|
# TODO(kevinbenton): this is currently disallowed by the API
|
||||||
# so we shouldn't hit it but this is a placeholder to add
|
# so we shouldn't hit it but this is a placeholder to add
|
||||||
# support later.
|
# support later.
|
||||||
|
@ -113,7 +113,7 @@ class DriverController(object):
|
||||||
# the following is to support updating the 'ha' and 'distributed'
|
# the following is to support updating the 'ha' and 'distributed'
|
||||||
# attributes via the API.
|
# attributes via the API.
|
||||||
try:
|
try:
|
||||||
_ensure_driver_supports_request(drv, router)
|
_ensure_driver_supports_request(drv, payload.request_body)
|
||||||
except lib_exc.InvalidInput:
|
except lib_exc.InvalidInput:
|
||||||
# the current driver does not support this request, we need to
|
# the current driver does not support this request, we need to
|
||||||
# migrate to a new provider. populate the distributed and ha
|
# migrate to a new provider. populate the distributed and ha
|
||||||
|
@ -123,25 +123,29 @@ class DriverController(object):
|
||||||
# we bail because changing the provider without changing
|
# we bail because changing the provider without changing
|
||||||
# the flavor will make things inconsistent. We can probably
|
# the flavor will make things inconsistent. We can probably
|
||||||
# update the flavor automatically in the future.
|
# update the flavor automatically in the future.
|
||||||
if old_router['flavor_id']:
|
if payload.states[0]['flavor_id']:
|
||||||
raise lib_exc.InvalidInput(error_message=_(
|
raise lib_exc.InvalidInput(error_message=_(
|
||||||
"Changing the 'ha' and 'distributed' attributes on a "
|
"Changing the 'ha' and 'distributed' attributes on a "
|
||||||
"router associated with a flavor is not supported"))
|
"router associated with a flavor is not supported"))
|
||||||
if 'distributed' not in router:
|
if 'distributed' not in payload.request_body:
|
||||||
router['distributed'] = old_router['distributed']
|
payload.request_body['distributed'] = (payload.states[0]
|
||||||
if 'ha' not in router:
|
['distributed'])
|
||||||
router['ha'] = old_router['distributed']
|
if 'ha' not in payload.request_body:
|
||||||
new_drv = self._attrs_to_driver(router)
|
payload.request_body['ha'] = payload.states[0]['distributed']
|
||||||
|
new_drv = self._attrs_to_driver(payload.request_body)
|
||||||
if new_drv:
|
if new_drv:
|
||||||
LOG.debug("Router %(id)s migrating from %(old)s provider to "
|
LOG.debug("Router %(id)s migrating from %(old)s provider to "
|
||||||
"%(new)s provider.", {'id': router_id, 'old': drv,
|
"%(new)s provider.", {'id': payload.resource_id,
|
||||||
|
'old': drv,
|
||||||
'new': new_drv})
|
'new': new_drv})
|
||||||
_ensure_driver_supports_request(new_drv, router)
|
_ensure_driver_supports_request(new_drv, payload.request_body)
|
||||||
# TODO(kevinbenton): notify old driver explicitly of driver change
|
# TODO(kevinbenton): notify old driver explicitly of driver change
|
||||||
with context.session.begin(subtransactions=True):
|
with payload.context.session.begin(subtransactions=True):
|
||||||
self._stm.del_resource_associations(context, [router_id])
|
self._stm.del_resource_associations(
|
||||||
|
payload.context, [payload.resource_id])
|
||||||
self._stm.add_resource_association(
|
self._stm.add_resource_association(
|
||||||
context, plugin_constants.L3, new_drv.name, router_id)
|
payload.context, plugin_constants.L3,
|
||||||
|
new_drv.name, payload.resource_id)
|
||||||
|
|
||||||
def get_provider_for_router(self, context, router_id):
|
def get_provider_for_router(self, context, router_id):
|
||||||
"""Return the provider driver handle for a router id."""
|
"""Return the provider driver handle for a router id."""
|
||||||
|
|
|
@ -98,28 +98,27 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||||
self.validate_policy_for_port(policy, port)
|
self.validate_policy_for_port(policy, port)
|
||||||
|
|
||||||
def _validate_update_port_callback(self, resource, event, trigger,
|
def _validate_update_port_callback(self, resource, event, trigger,
|
||||||
**kwargs):
|
payload=None):
|
||||||
context = kwargs['context']
|
context = payload.context
|
||||||
original_policy_id = kwargs['original_port'].get(
|
original_policy_id = payload.states[0].get(
|
||||||
qos_consts.QOS_POLICY_ID)
|
qos_consts.QOS_POLICY_ID)
|
||||||
policy_id = kwargs['port'].get(qos_consts.QOS_POLICY_ID)
|
policy_id = payload.desired_state.get(qos_consts.QOS_POLICY_ID)
|
||||||
|
|
||||||
if policy_id is None or policy_id == original_policy_id:
|
if policy_id is None or policy_id == original_policy_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
updated_port = ports_object.Port.get_object(
|
updated_port = ports_object.Port.get_object(
|
||||||
context, id=kwargs['port']['id'])
|
context, id=payload.desired_state['id'])
|
||||||
|
|
||||||
policy = policy_object.QosPolicy.get_object(
|
policy = policy_object.QosPolicy.get_object(
|
||||||
context.elevated(), id=policy_id)
|
context.elevated(), id=policy_id)
|
||||||
|
|
||||||
self.validate_policy_for_port(policy, updated_port)
|
self.validate_policy_for_port(policy, updated_port)
|
||||||
|
|
||||||
def _validate_update_network_callback(self, resource, event, trigger,
|
def _validate_update_network_callback(self, resource, event, trigger,
|
||||||
**kwargs):
|
payload=None):
|
||||||
context = kwargs['context']
|
context = payload.context
|
||||||
original_network = kwargs['original_network']
|
original_network = payload.states[0]
|
||||||
updated_network = kwargs['network']
|
updated_network = payload.desired_state
|
||||||
|
|
||||||
original_policy_id = original_network.get(qos_consts.QOS_POLICY_ID)
|
original_policy_id = original_network.get(qos_consts.QOS_POLICY_ID)
|
||||||
policy_id = updated_network.get(qos_consts.QOS_POLICY_ID)
|
policy_id = updated_network.get(qos_consts.QOS_POLICY_ID)
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
# TODO(boden): remove this once moved over to neutron-lib payloads
|
||||||
class TrunkPayload(object):
|
class TrunkPayload(object):
|
||||||
"""Payload for trunk-related callback registry notifications."""
|
"""Payload for trunk-related callback registry notifications."""
|
||||||
|
|
||||||
|
|
|
@ -245,13 +245,16 @@ class TrunkPlugin(service_base.ServicePluginBase,
|
||||||
# these are DB properties only.
|
# these are DB properties only.
|
||||||
trunk_obj.update_fields(trunk_data, reset_changes=True)
|
trunk_obj.update_fields(trunk_data, reset_changes=True)
|
||||||
trunk_obj.update()
|
trunk_obj.update()
|
||||||
payload = callbacks.TrunkPayload(context, trunk_id,
|
payload = events.DBEventPayload(
|
||||||
original_trunk=original_trunk,
|
context, resource_id=trunk_id, states=(original_trunk,),
|
||||||
current_trunk=trunk_obj)
|
desired_state=trunk_obj, request_body=trunk_data)
|
||||||
registry.notify(constants.TRUNK, events.PRECOMMIT_UPDATE, self,
|
registry.publish(constants.TRUNK, events.PRECOMMIT_UPDATE, self,
|
||||||
payload=payload)
|
payload=payload)
|
||||||
registry.notify(constants.TRUNK, events.AFTER_UPDATE, self,
|
registry.notify(constants.TRUNK, events.AFTER_UPDATE, self,
|
||||||
payload=payload)
|
payload=callbacks.TrunkPayload(
|
||||||
|
context, trunk_id,
|
||||||
|
original_trunk=original_trunk,
|
||||||
|
current_trunk=trunk_obj))
|
||||||
return trunk_obj
|
return trunk_obj
|
||||||
|
|
||||||
def delete_trunk(self, context, trunk_id):
|
def delete_trunk(self, context, trunk_id):
|
||||||
|
|
|
@ -331,16 +331,19 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
|
||||||
original_sg_dict = self.mixin.create_security_group(self.ctx,
|
original_sg_dict = self.mixin.create_security_group(self.ctx,
|
||||||
FAKE_SECGROUP)
|
FAKE_SECGROUP)
|
||||||
sg_id = original_sg_dict['id']
|
sg_id = original_sg_dict['id']
|
||||||
with mock.patch.object(registry, "notify") as mock_notify:
|
with mock.patch.object(registry, "publish") as mock_notify:
|
||||||
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
|
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
|
||||||
fake_secgroup['security_group']['name'] = 'updated_fake'
|
fake_secgroup['security_group']['name'] = 'updated_fake'
|
||||||
sg_dict = self.mixin.update_security_group(
|
sg_dict = self.mixin.update_security_group(
|
||||||
self.ctx, sg_id, fake_secgroup)
|
self.ctx, sg_id, fake_secgroup)
|
||||||
mock_notify.assert_has_calls([mock.call('security_group',
|
|
||||||
'precommit_update', mock.ANY, context=mock.ANY,
|
mock_notify.assert_has_calls(
|
||||||
original_security_group=original_sg_dict,
|
[mock.call('security_group', 'precommit_update', mock.ANY,
|
||||||
security_group=sg_dict,
|
payload=mock.ANY)])
|
||||||
security_group_id=sg_id)])
|
payload = mock_notify.call_args[1]['payload']
|
||||||
|
self.assertEqual(original_sg_dict, payload.states[0])
|
||||||
|
self.assertEqual(sg_id, payload.resource_id)
|
||||||
|
self.assertEqual(sg_dict, payload.desired_state)
|
||||||
|
|
||||||
def test_security_group_precommit_and_after_delete_event(self):
|
def test_security_group_precommit_and_after_delete_event(self):
|
||||||
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
|
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
|
||||||
|
|
|
@ -3675,9 +3675,12 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
|
||||||
self.assertFalse(body['routers'])
|
self.assertFalse(body['routers'])
|
||||||
|
|
||||||
def test_router_update_precommit_event(self):
|
def test_router_update_precommit_event(self):
|
||||||
nset = lambda *a, **k: setattr(k['router_db'], 'name',
|
|
||||||
k['old_router']['name'] + '_ha!')
|
def _nset(r, v, s, payload=None):
|
||||||
registry.subscribe(nset, resources.ROUTER, events.PRECOMMIT_UPDATE)
|
setattr(payload.desired_state, 'name',
|
||||||
|
payload.states[0]['name'] + '_ha!')
|
||||||
|
|
||||||
|
registry.subscribe(_nset, resources.ROUTER, events.PRECOMMIT_UPDATE)
|
||||||
with self.router(name='original') as r:
|
with self.router(name='original') as r:
|
||||||
update = self._update('routers', r['router']['id'],
|
update = self._update('routers', r['router']['id'],
|
||||||
{'router': {'name': 'hi'}})
|
{'router': {'name': 'hi'}})
|
||||||
|
|
|
@ -245,8 +245,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
|
||||||
self.deserialize(self.fmt, req.get_response(self.api))
|
self.deserialize(self.fmt, req.get_response(self.api))
|
||||||
precommit_update.assert_called_once_with(
|
precommit_update.assert_called_once_with(
|
||||||
resources.NETWORK, events.PRECOMMIT_UPDATE, mock.ANY,
|
resources.NETWORK, events.PRECOMMIT_UPDATE, mock.ANY,
|
||||||
context=mock.ANY, network=mock.ANY, original_network=mock.ANY,
|
payload=mock.ANY)
|
||||||
request=mock.ANY)
|
self.assertEqual(
|
||||||
|
'updated',
|
||||||
|
precommit_update.call_args[1]['payload'].desired_state['name'])
|
||||||
|
|
||||||
def test_network_after_update_callback(self):
|
def test_network_after_update_callback(self):
|
||||||
after_update = mock.Mock()
|
after_update = mock.Mock()
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from neutron_lib.callbacks import events
|
||||||
from neutron_lib import constants
|
from neutron_lib import constants
|
||||||
from neutron_lib import context
|
from neutron_lib import context
|
||||||
from neutron_lib import exceptions as lib_exc
|
from neutron_lib import exceptions as lib_exc
|
||||||
|
@ -97,9 +98,10 @@ class TestDriverController(testlib_api.SqlTestCase):
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
lib_exc.InvalidInput,
|
lib_exc.InvalidInput,
|
||||||
test_dc._update_router_provider,
|
test_dc._update_router_provider,
|
||||||
None, None, None, None,
|
None, None, None,
|
||||||
None, {'name': 'testname'},
|
payload=events.DBEventPayload(
|
||||||
{'flavor_id': 'old_fid'}, None)
|
None, request_body={'name': 'testname'},
|
||||||
|
states=({'flavor_id': 'old_fid'},)))
|
||||||
|
|
||||||
def test__set_router_provider_attr_lookups(self):
|
def test__set_router_provider_attr_lookups(self):
|
||||||
# ensure correct drivers are looked up based on attrs
|
# ensure correct drivers are looked up based on attrs
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from neutron_lib.callbacks import events
|
||||||
from neutron_lib import context
|
from neutron_lib import context
|
||||||
from neutron_lib import exceptions as lib_exc
|
from neutron_lib import exceptions as lib_exc
|
||||||
from neutron_lib.plugins import constants as plugins_constants
|
from neutron_lib.plugins import constants as plugins_constants
|
||||||
|
@ -193,7 +194,6 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||||
original_policy_id=None):
|
original_policy_id=None):
|
||||||
port_id = uuidutils.generate_uuid()
|
port_id = uuidutils.generate_uuid()
|
||||||
kwargs = {
|
kwargs = {
|
||||||
"context": self.ctxt,
|
|
||||||
"port": {
|
"port": {
|
||||||
"id": port_id,
|
"id": port_id,
|
||||||
qos_consts.QOS_POLICY_ID: policy_id
|
qos_consts.QOS_POLICY_ID: policy_id
|
||||||
|
@ -218,7 +218,10 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||||
self.ctxt, "elevated", return_value=admin_ctxt
|
self.ctxt, "elevated", return_value=admin_ctxt
|
||||||
):
|
):
|
||||||
self.qos_plugin._validate_update_port_callback(
|
self.qos_plugin._validate_update_port_callback(
|
||||||
"PORT", "precommit_update", "test_plugin", **kwargs)
|
"PORT", "precommit_update", "test_plugin",
|
||||||
|
payload=events.DBEventPayload(
|
||||||
|
self.ctxt, desired_state=kwargs['port'],
|
||||||
|
states=(kwargs['original_port'],)))
|
||||||
if policy_id is None or policy_id == original_policy_id:
|
if policy_id is None or policy_id == original_policy_id:
|
||||||
get_port.assert_not_called()
|
get_port.assert_not_called()
|
||||||
get_policy.assert_not_called()
|
get_policy.assert_not_called()
|
||||||
|
@ -276,7 +279,10 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||||
self.ctxt, "elevated", return_value=admin_ctxt
|
self.ctxt, "elevated", return_value=admin_ctxt
|
||||||
):
|
):
|
||||||
self.qos_plugin._validate_update_network_callback(
|
self.qos_plugin._validate_update_network_callback(
|
||||||
"NETWORK", "precommit_update", "test_plugin", **kwargs)
|
"NETWORK", "precommit_update", "test_plugin",
|
||||||
|
payload=events.DBEventPayload(
|
||||||
|
self.ctxt, desired_state=kwargs['network'],
|
||||||
|
states=(kwargs['original_network'],)))
|
||||||
if policy_id is None or policy_id == original_policy_id:
|
if policy_id is None or policy_id == original_policy_id:
|
||||||
get_policy.assert_not_called()
|
get_policy.assert_not_called()
|
||||||
get_ports.assert_not_called()
|
get_ports.assert_not_called()
|
||||||
|
|
|
@ -136,7 +136,23 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
|
||||||
self._test_trunk_update_notify(events.AFTER_UPDATE)
|
self._test_trunk_update_notify(events.AFTER_UPDATE)
|
||||||
|
|
||||||
def test_trunk_update_notify_precommit_update(self):
|
def test_trunk_update_notify_precommit_update(self):
|
||||||
self._test_trunk_update_notify(events.PRECOMMIT_UPDATE)
|
# TODO(boden): refactor back into _test_trunk_update_notify
|
||||||
|
# once all code uses neutron-lib payloads
|
||||||
|
with self.port() as parent_port:
|
||||||
|
callback = register_mock_callback(
|
||||||
|
constants.TRUNK, events.PRECOMMIT_UPDATE)
|
||||||
|
trunk = self._create_test_trunk(parent_port)
|
||||||
|
orig_trunk_obj = self._get_trunk_obj(trunk['id'])
|
||||||
|
trunk_req = {'trunk': {'name': 'foo'}}
|
||||||
|
self.trunk_plugin.update_trunk(self.context, trunk['id'],
|
||||||
|
trunk_req)
|
||||||
|
trunk_obj = self._get_trunk_obj(trunk['id'])
|
||||||
|
callback.assert_called_once_with(
|
||||||
|
constants.TRUNK, events.PRECOMMIT_UPDATE,
|
||||||
|
self.trunk_plugin, payload=mock.ANY)
|
||||||
|
call_payload = callback.call_args[1]['payload']
|
||||||
|
self.assertEqual(orig_trunk_obj, call_payload.states[0])
|
||||||
|
self.assertEqual(trunk_obj, call_payload.desired_state)
|
||||||
|
|
||||||
def _test_trunk_delete_notify(self, event):
|
def _test_trunk_delete_notify(self, event):
|
||||||
with self.port() as parent_port:
|
with self.port() as parent_port:
|
||||||
|
|
Loading…
Reference in New Issue