From 92c636d8b2b391ce48ff349dcfd65051a0aa623e Mon Sep 17 00:00:00 2001 From: Nurmatov Mamatisa Date: Mon, 30 Aug 2021 17:58:04 +0300 Subject: [PATCH] Use payload callback for RPC resource_cache This patch switches the code over to the payload style of callbacks [1]. [1]https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html Change-Id: I2e65470e6f37ebccde01bdf3f9ed31b29567398f --- neutron/agent/resource_cache.py | 19 +++-- neutron/agent/rpc.py | 20 ++--- .../api/rpc/handlers/securitygroups_rpc.py | 32 ++++---- neutron/services/logapi/common/sg_callback.py | 8 +- neutron/services/logapi/drivers/manager.py | 2 +- .../tests/unit/agent/test_resource_cache.py | 31 ++++---- neutron/tests/unit/agent/test_rpc.py | 75 ++++++++++++------- .../logapi/common/test_sg_callback.py | 12 ++- 8 files changed, 122 insertions(+), 77 deletions(-) diff --git a/neutron/agent/resource_cache.py b/neutron/agent/resource_cache.py index 5f37dcead7f..48ec17e35cd 100644 --- a/neutron/agent/resource_cache.py +++ b/neutron/agent/resource_cache.py @@ -191,11 +191,13 @@ class RemoteResourceCache(object): else: LOG.debug("Received new resource %s: %s", rtype, resource) # local notification for agent internals to subscribe to - registry.notify(rtype, events.AFTER_UPDATE, self, - context=context, changed_fields=changed_fields, - existing=existing, updated=resource, - resource_id=resource.id, - agent_restarted=agent_restarted) + registry.publish(rtype, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + context, + metadata={'changed_fields': changed_fields, + 'agent_restarted': agent_restarted}, + resource_id=resource.id, + states=(existing, resource))) def record_resource_delete(self, context, rtype, resource_id): # deletions are final, record them so we never @@ -209,8 +211,11 @@ class RemoteResourceCache(object): self._deleted_ids_by_type[rtype].add(resource_id) existing = self._type_cache(rtype).pop(resource_id, None) # local notification for agent internals to subscribe to - registry.notify(rtype, events.AFTER_DELETE, self, context=context, - existing=existing, resource_id=resource_id) + registry.publish(rtype, events.AFTER_DELETE, self, + payload=events.DBEventPayload( + context, + resource_id=resource_id, + states=(existing,))) def _get_changed_fields(self, old, new): """Returns changed fields excluding update time and revision.""" diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index a5e75a69b4b..eb1e2930d6d 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -238,17 +238,18 @@ class CacheBackedPluginApi(PluginApi): for r in (resources.PORT, resources.NETWORK): registry.subscribe(self._legacy_notifier, r, e) - def _legacy_notifier(self, rtype, event, trigger, context, resource_id, - **kwargs): + def _legacy_notifier(self, rtype, event, trigger, payload): """Checks if legacy interface is expecting calls for resource. looks for port_update, network_delete, etc and calls them with the payloads the handlers are expecting (an ID). """ + context = payload.context + resource_id = payload.resource_id rtype = rtype.lower() # all legacy handlers don't camelcase - agent_restarted = kwargs.pop("agent_restarted", None) + agent_restarted = payload.metadata.pop("agent_restarted", None) method, host_with_activation, host_with_deactivation = ( - self._get_method_host(rtype, event, **kwargs)) + self._get_method_host(rtype, event, payload)) if not hasattr(self._legacy_interface, method): # TODO(kevinbenton): once these notifications are stable, emit # a deprecation warning for legacy handlers @@ -268,7 +269,7 @@ class CacheBackedPluginApi(PluginApi): payload["agent_restarted"] = agent_restarted getattr(self._legacy_interface, method)(context, **payload) - def _get_method_host(self, rtype, event, **kwargs): + def _get_method_host(self, rtype, event, payload): """Constructs the name of method to be called in the legacy interface. If the event received is a port update that contains a binding @@ -288,20 +289,21 @@ class CacheBackedPluginApi(PluginApi): # A port update was received. Find out if it is a binding activation # where a previous binding was deactivated BINDINGS = pb_ext.COLLECTION_NAME - if BINDINGS in kwargs.get('changed_fields', set()): + changed_fields = payload.metadata['changed_fields'] + if BINDINGS in changed_fields: existing_active_binding = ( utils.get_port_binding_by_status_and_host( - getattr(kwargs['existing'], 'bindings', []), + getattr(payload.states[0], 'bindings', []), constants.ACTIVE)) updated_active_binding = ( utils.get_port_binding_by_status_and_host( - getattr(kwargs['updated'], 'bindings', []), + getattr(payload.latest_state, 'bindings', []), constants.ACTIVE)) if (existing_active_binding and updated_active_binding and existing_active_binding.host != updated_active_binding.host): if (utils.get_port_binding_by_status_and_host( - getattr(kwargs['updated'], 'bindings', []), + getattr(payload.latest_state, 'bindings', []), constants.INACTIVE, host=existing_active_binding.host)): method = BINDING_DEACTIVATE diff --git a/neutron/api/rpc/handlers/securitygroups_rpc.py b/neutron/api/rpc/handlers/securitygroups_rpc.py index 6618625ec12..f114483daf3 100644 --- a/neutron/api/rpc/handlers/securitygroups_rpc.py +++ b/neutron/api/rpc/handlers/securitygroups_rpc.py @@ -281,17 +281,19 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): return set([rule.security_group_id for rule in self.rcache.get_resources('SecurityGroupRule', filters)]) - def _add_child_sg_rules(self, rtype, event, trigger, context, updated, - **kwargs): + def _add_child_sg_rules(self, rtype, event, trigger, payload): # whenever we receive a full security group, add all child rules # because the server won't emit events for the individual rules on # creation. + context = payload.context + updated = payload.latest_state for rule in updated.rules: self.rcache.record_resource_update(context, 'SecurityGroupRule', rule) - def _clear_child_sg_rules(self, rtype, event, trigger, context, existing, - **kwargs): + def _clear_child_sg_rules(self, rtype, event, trigger, payload): + context = payload.context + existing = payload.states[0] if not existing: return # the server can delete an entire security group without notifying @@ -302,28 +304,30 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): self.rcache.record_resource_delete(context, 'SecurityGroupRule', rule.id) - def _handle_sg_rule_delete(self, rtype, event, trigger, context, existing, - **kwargs): + def _handle_sg_rule_delete(self, rtype, event, trigger, payload): + existing = payload.states[0] if not existing: return sg_id = existing.security_group_id self._sg_agent.security_groups_rule_updated([sg_id]) - def _handle_sg_rule_update(self, rtype, event, trigger, context, existing, - updated, **kwargs): + def _handle_sg_rule_update(self, rtype, event, trigger, payload): + updated = payload.latest_state sg_id = updated.security_group_id self._sg_agent.security_groups_rule_updated([sg_id]) - def _handle_sg_member_delete(self, rtype, event, trigger, context, - existing, **kwargs): + def _handle_sg_member_delete(self, rtype, event, trigger, payload): # received on port delete + existing = payload.states[0] sgs = set(existing.security_group_ids) if existing else set() if sgs: self._sg_agent.security_groups_member_updated(sgs) - def _handle_sg_member_update(self, rtype, event, trigger, context, - existing, updated, changed_fields, **kwargs): + def _handle_sg_member_update(self, rtype, event, trigger, payload): # received on port update + existing = payload.states[0] + updated = payload.latest_state + changed_fields = payload.metadata['changed_fields'] sgs = set(existing.security_group_ids) if existing else set() if not changed_fields.intersection({'security_group_ids', 'fixed_ips', 'allowed_address_pairs'}): @@ -333,8 +337,8 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): if sgs: self._sg_agent.security_groups_member_updated(sgs) - def _handle_address_group_event(self, rtype, event, trigger, context, - resource_id, **kwargs): + def _handle_address_group_event(self, rtype, event, trigger, payload): + resource_id = payload.resource_id if event == events.AFTER_UPDATE: self._sg_agent.address_group_updated(resource_id) else: diff --git a/neutron/services/logapi/common/sg_callback.py b/neutron/services/logapi/common/sg_callback.py index 39ce16c80a9..587e030e1cf 100644 --- a/neutron/services/logapi/common/sg_callback.py +++ b/neutron/services/logapi/common/sg_callback.py @@ -21,13 +21,13 @@ from neutron.services.logapi.drivers import manager class SecurityGroupRuleCallBack(manager.ResourceCallBackBase): - def handle_event(self, resource, event, trigger, **kwargs): - context = kwargs.get("context") - sg_rule = kwargs.get('security_group_rule') + def handle_event(self, resource, event, trigger, payload): + context = payload.context + sg_rule = payload.latest_state if sg_rule: sg_id = sg_rule.get('security_group_id') else: - sg_id = kwargs.get('security_group_id') + sg_id = payload.resource_id log_resources = db_api.get_logs_bound_sg(context, sg_id) if log_resources: diff --git a/neutron/services/logapi/drivers/manager.py b/neutron/services/logapi/drivers/manager.py index 3863038d0b7..55465c92d53 100644 --- a/neutron/services/logapi/drivers/manager.py +++ b/neutron/services/logapi/drivers/manager.py @@ -57,7 +57,7 @@ class ResourceCallBackBase(object): events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event) - def handle_event(self, resource, event, trigger, **kwargs): + def handle_event(self, resource, event, trigger, payload): """Handle resource callback event""" pass diff --git a/neutron/tests/unit/agent/test_resource_cache.py b/neutron/tests/unit/agent/test_resource_cache.py index a07b5d218af..5e6e9172870 100644 --- a/neutron/tests/unit/agent/test_resource_cache.py +++ b/neutron/tests/unit/agent/test_resource_cache.py @@ -58,7 +58,8 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): def test__flood_cache_for_query_pulls_once(self): resources = [OVOLikeThing(66), OVOLikeThing(67)] received_kw = [] - receiver = lambda *a, **k: received_kw.append(k) + receiver = lambda r, e, t, payload: \ + received_kw.append(payload) registry.subscribe(receiver, 'goose', events.AFTER_UPDATE) self._pullmock.bulk_pull.side_effect = [ @@ -87,7 +88,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): mock.ANY, 'goose', filter_kwargs={'id': (67, )}) self.assertCountEqual( - resources, [rec['updated'] for rec in received_kw]) + resources, [rec.latest_state for rec in received_kw]) def test_bulk_pull_doesnt_wipe_out_newer_data(self): self.rcache.record_resource_update( @@ -137,12 +138,13 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): def test_record_resource_update(self): received_kw = [] - receiver = lambda *a, **k: received_kw.append(k) + receiver = lambda r, e, t, payload: \ + received_kw.append(payload) registry.subscribe(receiver, 'goose', events.AFTER_UPDATE) self.rcache.record_resource_update(self.ctx, 'goose', OVOLikeThing(3, size='large')) self.assertEqual(1, len(received_kw)) - self.assertIsNone(received_kw[0]['existing']) + self.assertIsNone(received_kw[0].states[0]) # another update with no changed fields results in no callback self.rcache.record_resource_update(self.ctx, 'goose', OVOLikeThing(3, size='large', @@ -152,29 +154,32 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): OVOLikeThing(3, size='small', revision_number=101)) self.assertEqual(2, len(received_kw)) - self.assertEqual('large', received_kw[1]['existing'].size) - self.assertEqual('small', received_kw[1]['updated'].size) - self.assertEqual(set(['size']), received_kw[1]['changed_fields']) + self.assertEqual('large', received_kw[1].states[0].size) + self.assertEqual('small', received_kw[1].latest_state.size) + self.assertEqual(set(['size']), + received_kw[1].metadata['changed_fields']) def test_record_resource_delete(self): received_kw = [] - receiver = lambda *a, **k: received_kw.append(k) + receiver = lambda r, e, t, payload: \ + received_kw.append(payload) registry.subscribe(receiver, 'goose', events.AFTER_DELETE) self.rcache.record_resource_update(self.ctx, 'goose', OVOLikeThing(3, size='large')) self.rcache.record_resource_delete(self.ctx, 'goose', 3) self.assertEqual(1, len(received_kw)) - self.assertEqual(3, received_kw[0]['existing'].id) - self.assertEqual(3, received_kw[0]['resource_id']) + self.assertEqual(3, received_kw[0].states[0].id) + self.assertEqual(3, received_kw[0].resource_id) # deletes of non-existing cache items are still honored self.rcache.record_resource_delete(self.ctx, 'goose', 4) self.assertEqual(2, len(received_kw)) - self.assertIsNone(received_kw[1]['existing']) - self.assertEqual(4, received_kw[1]['resource_id']) + self.assertIsNone(received_kw[1].states[0]) + self.assertEqual(4, received_kw[1].resource_id) def test_record_resource_delete_ignores_dups(self): received_kw = [] - receiver = lambda *a, **k: received_kw.append(k) + receiver = lambda r, e, t, payload: \ + received_kw.append(payload) registry.subscribe(receiver, 'goose', events.AFTER_DELETE) self.rcache.record_resource_delete(self.ctx, 'goose', 3) self.assertEqual(1, len(received_kw)) diff --git a/neutron/tests/unit/agent/test_rpc.py b/neutron/tests/unit/agent/test_rpc.py index 9d61f8c91b2..9a5413f74f3 100644 --- a/neutron/tests/unit/agent/test_rpc.py +++ b/neutron/tests/unit/agent/test_rpc.py @@ -219,8 +219,10 @@ class TestCacheBackedPluginApi(base.BaseTestCase): def test__legacy_notifier_resource_delete(self): self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self, - mock.ANY, resource_id=self._port_id, - existing=self._port) + payload=events.DBEventPayload( + mock.ANY, + resource_id=self._port_id, + states=(self._port,))) self._api._legacy_interface.port_update.assert_not_called() self._api._legacy_interface.port_delete.assert_called_once_with( mock.ANY, port={'id': self._port_id}, port_id=self._port_id) @@ -229,9 +231,14 @@ class TestCacheBackedPluginApi(base.BaseTestCase): def test__legacy_notifier_resource_update(self): updated_port = ports.Port(id=self._port_id, name='updated_port') self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, - mock.ANY, changed_fields=set(['name']), - resource_id=self._port_id, - existing=self._port, updated=updated_port) + payload=events.DBEventPayload( + mock.ANY, + metadata={ + 'changed_fields': set(['name']) + }, + resource_id=self._port_id, + states=(self._port, updated_port))) + self._api._legacy_interface.port_delete.assert_not_called() self._api._legacy_interface.port_update.assert_called_once_with( mock.ANY, port={'id': self._port_id}, port_id=self._port_id) @@ -246,11 +253,16 @@ class TestCacheBackedPluginApi(base.BaseTestCase): ports.PortBinding(port_id=self._port_id, host='host1', status=constants.INACTIVE)]) - self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, - mock.ANY, - changed_fields=set(['name', 'bindings']), - resource_id=self._port_id, - existing=self._port, updated=updated_port) + self._api._legacy_notifier( + resources.PORT, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + mock.ANY, + metadata={ + 'changed_fields': set(['name', 'bindings']) + }, + resource_id=self._port_id, + states=(self._port, updated_port))) + self._api._legacy_interface.port_update.assert_not_called() self._api._legacy_interface.port_delete.assert_not_called() @@ -267,27 +279,40 @@ class TestCacheBackedPluginApi(base.BaseTestCase): bindings=[ports.PortBinding(port_id=self._port_id, host='host2', status=constants.ACTIVE)]) - self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, - mock.ANY, - changed_fields=set(['name', 'bindings']), - resource_id=self._port_id, - existing=self._port, updated=updated_port) + self._api._legacy_notifier( + resources.PORT, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + mock.ANY, + metadata={ + 'changed_fields': set(['name', 'bindings']) + }, + resource_id=self._port_id, + states=(self._port, updated_port))) + self._api._legacy_interface.port_update.assert_called_once_with( mock.ANY, port={'id': self._port_id}, port_id=self._port_id) self._api._legacy_interface.port_delete.assert_not_called() self._api._legacy_interface.binding_deactivate.assert_not_called() def test__legacy_notifier_existing_or_updated_is_none(self): - self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, - self, mock.ANY, - changed_fields=set(['name', 'bindings']), - resource_id=self._port_id, - existing=None, updated=None) - self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, - mock.ANY, - changed_fields=set(['name', 'bindings']), - resource_id=self._port_id, - existing=self._port, updated=None) + self._api._legacy_notifier( + resources.PORT, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + mock.ANY, + metadata={ + 'changed_fields': set(['name', 'bindings']) + }, + resource_id=self._port_id, + states=(None, None))) + self._api._legacy_notifier( + resources.PORT, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + mock.ANY, + metadata={ + 'changed_fields': set(['name', 'bindings']) + }, + resource_id=self._port_id, + states=(self._port, None))) call = mock.call(mock.ANY, port={'id': self._port_id}, port_id=self._port_id) self._api._legacy_interface.port_update.assert_has_calls([call, call]) diff --git a/neutron/tests/unit/services/logapi/common/test_sg_callback.py b/neutron/tests/unit/services/logapi/common/test_sg_callback.py index 28f526d61a9..de2de116fb6 100644 --- a/neutron/tests/unit/services/logapi/common/test_sg_callback.py +++ b/neutron/tests/unit/services/logapi/common/test_sg_callback.py @@ -59,10 +59,14 @@ class TestSecurityGroupRuleCallback(base.BaseTestCase): fake_register() self.driver_manager.register_driver(FAKE_DRIVER) - registry.notify( - resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY) + registry.publish( + resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY, + payload=events.DBEventPayload(mock.ANY, states=(mock.ANY,))) mock_sg_cb.assert_called_once_with( - resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY) + resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY, + payload=mock.ANY) mock_sg_cb.reset_mock() - registry.notify('fake_resource', events.AFTER_DELETE, mock.ANY) + registry.publish('fake_resource', events.AFTER_DELETE, mock.ANY, + payload=events.DBEventPayload(mock.ANY, + states=(mock.ANY,))) mock_sg_cb.assert_not_called()