Use payload callback for RPC resource_cache

This patch switches the code over to the payload style of callbacks [1].

[1]https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html

Change-Id: I2e65470e6f37ebccde01bdf3f9ed31b29567398f
This commit is contained in:
Nurmatov Mamatisa 2021-08-30 17:58:04 +03:00 committed by Mamatisa Nurmatov
parent dea5a9d3e3
commit 92c636d8b2
8 changed files with 122 additions and 77 deletions

View File

@ -191,11 +191,13 @@ class RemoteResourceCache(object):
else:
LOG.debug("Received new resource %s: %s", rtype, resource)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_UPDATE, self,
context=context, changed_fields=changed_fields,
existing=existing, updated=resource,
registry.publish(rtype, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context,
metadata={'changed_fields': changed_fields,
'agent_restarted': agent_restarted},
resource_id=resource.id,
agent_restarted=agent_restarted)
states=(existing, resource)))
def record_resource_delete(self, context, rtype, resource_id):
# deletions are final, record them so we never
@ -209,8 +211,11 @@ class RemoteResourceCache(object):
self._deleted_ids_by_type[rtype].add(resource_id)
existing = self._type_cache(rtype).pop(resource_id, None)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_DELETE, self, context=context,
existing=existing, resource_id=resource_id)
registry.publish(rtype, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context,
resource_id=resource_id,
states=(existing,)))
def _get_changed_fields(self, old, new):
"""Returns changed fields excluding update time and revision."""

View File

@ -238,17 +238,18 @@ class CacheBackedPluginApi(PluginApi):
for r in (resources.PORT, resources.NETWORK):
registry.subscribe(self._legacy_notifier, r, e)
def _legacy_notifier(self, rtype, event, trigger, context, resource_id,
**kwargs):
def _legacy_notifier(self, rtype, event, trigger, payload):
"""Checks if legacy interface is expecting calls for resource.
looks for port_update, network_delete, etc and calls them with
the payloads the handlers are expecting (an ID).
"""
context = payload.context
resource_id = payload.resource_id
rtype = rtype.lower() # all legacy handlers don't camelcase
agent_restarted = kwargs.pop("agent_restarted", None)
agent_restarted = payload.metadata.pop("agent_restarted", None)
method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs))
self._get_method_host(rtype, event, payload))
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
@ -268,7 +269,7 @@ class CacheBackedPluginApi(PluginApi):
payload["agent_restarted"] = agent_restarted
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
def _get_method_host(self, rtype, event, payload):
"""Constructs the name of method to be called in the legacy interface.
If the event received is a port update that contains a binding
@ -288,20 +289,21 @@ class CacheBackedPluginApi(PluginApi):
# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
BINDINGS = pb_ext.COLLECTION_NAME
if BINDINGS in kwargs.get('changed_fields', set()):
changed_fields = payload.metadata['changed_fields']
if BINDINGS in changed_fields:
existing_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['existing'], 'bindings', []),
getattr(payload.states[0], 'bindings', []),
constants.ACTIVE))
updated_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
getattr(payload.latest_state, 'bindings', []),
constants.ACTIVE))
if (existing_active_binding and updated_active_binding and
existing_active_binding.host !=
updated_active_binding.host):
if (utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
getattr(payload.latest_state, 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = BINDING_DEACTIVATE

View File

@ -281,17 +281,19 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
return set([rule.security_group_id for rule in
self.rcache.get_resources('SecurityGroupRule', filters)])
def _add_child_sg_rules(self, rtype, event, trigger, context, updated,
**kwargs):
def _add_child_sg_rules(self, rtype, event, trigger, payload):
# whenever we receive a full security group, add all child rules
# because the server won't emit events for the individual rules on
# creation.
context = payload.context
updated = payload.latest_state
for rule in updated.rules:
self.rcache.record_resource_update(context, 'SecurityGroupRule',
rule)
def _clear_child_sg_rules(self, rtype, event, trigger, context, existing,
**kwargs):
def _clear_child_sg_rules(self, rtype, event, trigger, payload):
context = payload.context
existing = payload.states[0]
if not existing:
return
# the server can delete an entire security group without notifying
@ -302,28 +304,30 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
self.rcache.record_resource_delete(context, 'SecurityGroupRule',
rule.id)
def _handle_sg_rule_delete(self, rtype, event, trigger, context, existing,
**kwargs):
def _handle_sg_rule_delete(self, rtype, event, trigger, payload):
existing = payload.states[0]
if not existing:
return
sg_id = existing.security_group_id
self._sg_agent.security_groups_rule_updated([sg_id])
def _handle_sg_rule_update(self, rtype, event, trigger, context, existing,
updated, **kwargs):
def _handle_sg_rule_update(self, rtype, event, trigger, payload):
updated = payload.latest_state
sg_id = updated.security_group_id
self._sg_agent.security_groups_rule_updated([sg_id])
def _handle_sg_member_delete(self, rtype, event, trigger, context,
existing, **kwargs):
def _handle_sg_member_delete(self, rtype, event, trigger, payload):
# received on port delete
existing = payload.states[0]
sgs = set(existing.security_group_ids) if existing else set()
if sgs:
self._sg_agent.security_groups_member_updated(sgs)
def _handle_sg_member_update(self, rtype, event, trigger, context,
existing, updated, changed_fields, **kwargs):
def _handle_sg_member_update(self, rtype, event, trigger, payload):
# received on port update
existing = payload.states[0]
updated = payload.latest_state
changed_fields = payload.metadata['changed_fields']
sgs = set(existing.security_group_ids) if existing else set()
if not changed_fields.intersection({'security_group_ids', 'fixed_ips',
'allowed_address_pairs'}):
@ -333,8 +337,8 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
if sgs:
self._sg_agent.security_groups_member_updated(sgs)
def _handle_address_group_event(self, rtype, event, trigger, context,
resource_id, **kwargs):
def _handle_address_group_event(self, rtype, event, trigger, payload):
resource_id = payload.resource_id
if event == events.AFTER_UPDATE:
self._sg_agent.address_group_updated(resource_id)
else:

View File

@ -21,13 +21,13 @@ from neutron.services.logapi.drivers import manager
class SecurityGroupRuleCallBack(manager.ResourceCallBackBase):
def handle_event(self, resource, event, trigger, **kwargs):
context = kwargs.get("context")
sg_rule = kwargs.get('security_group_rule')
def handle_event(self, resource, event, trigger, payload):
context = payload.context
sg_rule = payload.latest_state
if sg_rule:
sg_id = sg_rule.get('security_group_id')
else:
sg_id = kwargs.get('security_group_id')
sg_id = payload.resource_id
log_resources = db_api.get_logs_bound_sg(context, sg_id)
if log_resources:

View File

@ -57,7 +57,7 @@ class ResourceCallBackBase(object):
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
def handle_event(self, resource, event, trigger, **kwargs):
def handle_event(self, resource, event, trigger, payload):
"""Handle resource callback event"""
pass

View File

@ -58,7 +58,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test__flood_cache_for_query_pulls_once(self):
resources = [OVOLikeThing(66), OVOLikeThing(67)]
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
self._pullmock.bulk_pull.side_effect = [
@ -87,7 +88,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
mock.ANY, 'goose', filter_kwargs={'id': (67, )})
self.assertCountEqual(
resources, [rec['updated'] for rec in received_kw])
resources, [rec.latest_state for rec in received_kw])
def test_bulk_pull_doesnt_wipe_out_newer_data(self):
self.rcache.record_resource_update(
@ -137,12 +138,13 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
def test_record_resource_update(self):
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
self.rcache.record_resource_update(self.ctx, 'goose',
OVOLikeThing(3, size='large'))
self.assertEqual(1, len(received_kw))
self.assertIsNone(received_kw[0]['existing'])
self.assertIsNone(received_kw[0].states[0])
# another update with no changed fields results in no callback
self.rcache.record_resource_update(self.ctx, 'goose',
OVOLikeThing(3, size='large',
@ -152,29 +154,32 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
OVOLikeThing(3, size='small',
revision_number=101))
self.assertEqual(2, len(received_kw))
self.assertEqual('large', received_kw[1]['existing'].size)
self.assertEqual('small', received_kw[1]['updated'].size)
self.assertEqual(set(['size']), received_kw[1]['changed_fields'])
self.assertEqual('large', received_kw[1].states[0].size)
self.assertEqual('small', received_kw[1].latest_state.size)
self.assertEqual(set(['size']),
received_kw[1].metadata['changed_fields'])
def test_record_resource_delete(self):
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
self.rcache.record_resource_update(self.ctx, 'goose',
OVOLikeThing(3, size='large'))
self.rcache.record_resource_delete(self.ctx, 'goose', 3)
self.assertEqual(1, len(received_kw))
self.assertEqual(3, received_kw[0]['existing'].id)
self.assertEqual(3, received_kw[0]['resource_id'])
self.assertEqual(3, received_kw[0].states[0].id)
self.assertEqual(3, received_kw[0].resource_id)
# deletes of non-existing cache items are still honored
self.rcache.record_resource_delete(self.ctx, 'goose', 4)
self.assertEqual(2, len(received_kw))
self.assertIsNone(received_kw[1]['existing'])
self.assertEqual(4, received_kw[1]['resource_id'])
self.assertIsNone(received_kw[1].states[0])
self.assertEqual(4, received_kw[1].resource_id)
def test_record_resource_delete_ignores_dups(self):
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
receiver = lambda r, e, t, payload: \
received_kw.append(payload)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
self.rcache.record_resource_delete(self.ctx, 'goose', 3)
self.assertEqual(1, len(received_kw))

View File

@ -219,8 +219,10 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
mock.ANY, resource_id=self._port_id,
existing=self._port)
payload=events.DBEventPayload(
mock.ANY,
resource_id=self._port_id,
states=(self._port,)))
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
@ -229,9 +231,14 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_resource_update(self):
updated_port = ports.Port(id=self._port_id, name='updated_port')
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY, changed_fields=set(['name']),
payload=events.DBEventPayload(
mock.ANY,
metadata={
'changed_fields': set(['name'])
},
resource_id=self._port_id,
existing=self._port, updated=updated_port)
states=(self._port, updated_port)))
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
@ -246,11 +253,16 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.INACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
self._api._legacy_notifier(
resources.PORT, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
mock.ANY,
changed_fields=set(['name', 'bindings']),
metadata={
'changed_fields': set(['name', 'bindings'])
},
resource_id=self._port_id,
existing=self._port, updated=updated_port)
states=(self._port, updated_port)))
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_not_called()
@ -267,27 +279,40 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
bindings=[ports.PortBinding(port_id=self._port_id,
host='host2',
status=constants.ACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
self._api._legacy_notifier(
resources.PORT, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
mock.ANY,
changed_fields=set(['name', 'bindings']),
metadata={
'changed_fields': set(['name', 'bindings'])
},
resource_id=self._port_id,
existing=self._port, updated=updated_port)
states=(self._port, updated_port)))
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_existing_or_updated_is_none(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE,
self, mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=None, updated=None)
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
self._api._legacy_notifier(
resources.PORT, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
mock.ANY,
changed_fields=set(['name', 'bindings']),
metadata={
'changed_fields': set(['name', 'bindings'])
},
resource_id=self._port_id,
existing=self._port, updated=None)
states=(None, None)))
self._api._legacy_notifier(
resources.PORT, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
mock.ANY,
metadata={
'changed_fields': set(['name', 'bindings'])
},
resource_id=self._port_id,
states=(self._port, None)))
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id)
self._api._legacy_interface.port_update.assert_has_calls([call, call])

View File

@ -59,10 +59,14 @@ class TestSecurityGroupRuleCallback(base.BaseTestCase):
fake_register()
self.driver_manager.register_driver(FAKE_DRIVER)
registry.notify(
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY)
registry.publish(
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY,
payload=events.DBEventPayload(mock.ANY, states=(mock.ANY,)))
mock_sg_cb.assert_called_once_with(
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY)
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY,
payload=mock.ANY)
mock_sg_cb.reset_mock()
registry.notify('fake_resource', events.AFTER_DELETE, mock.ANY)
registry.publish('fake_resource', events.AFTER_DELETE, mock.ANY,
payload=events.DBEventPayload(mock.ANY,
states=(mock.ANY,)))
mock_sg_cb.assert_not_called()