From 510089bc5fc08b81df24e67923000e5c56dfda66 Mon Sep 17 00:00:00 2001 From: LIU Yulong Date: Wed, 25 Nov 2020 09:43:05 +0800 Subject: [PATCH] Upgrade RPC version of SecurityGroup*Rpc A partial upgrading of neutron cluster, neutron-server has a newer version while neutron-agents not, does not run well after a RPC data structure upgrading. This patch upgrades the security group related RPC version between neutron-server and agents. A partial upgrading neutron cluster will explicitly raise error. The RPC version should be aligned. Closes-bug: #1903531 Related-bug: #1867119 Change-Id: I6cb2ba05fa3337be46eb01f2d9f869efa41e4db6 --- .../api/rpc/handlers/securitygroups_rpc.py | 38 +++++++++++++++-- .../unit/agent/test_securitygroups_rpc.py | 42 +++++++++++++++---- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/neutron/api/rpc/handlers/securitygroups_rpc.py b/neutron/api/rpc/handlers/securitygroups_rpc.py index b1864723df4..f5abf809872 100644 --- a/neutron/api/rpc/handlers/securitygroups_rpc.py +++ b/neutron/api/rpc/handlers/securitygroups_rpc.py @@ -23,6 +23,7 @@ from neutron_lib import rpc as n_rpc from neutron_lib.utils import net from oslo_log import log as logging import oslo_messaging +from oslo_utils import versionutils from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc @@ -56,9 +57,11 @@ class SecurityGroupServerRpcApi(object): def security_group_info_for_devices(self, context, devices): LOG.debug("Get security group information for devices via rpc %r", devices) - cctxt = self.client.prepare(version='1.2') + call_version = '1.3' + cctxt = self.client.prepare(version=call_version) return cctxt.call(context, 'security_group_info_for_devices', - devices=devices) + devices=devices, + call_version=call_version) class SecurityGroupServerRpcCallback(object): @@ -72,10 +75,12 @@ class SecurityGroupServerRpcCallback(object): # API version history: # 1.1 - Initial version # 1.2 - security_group_info_for_devices introduced as an optimization + # 1.3 - security_group_info_for_devices returns member_ips with new + # structure. # NOTE: target must not be overridden in subclasses # to keep RPC API version consistent across plugins. - target = oslo_messaging.Target(version='1.2', + target = oslo_messaging.Target(version='1.3', namespace=constants.RPC_NAMESPACE_SECGROUP) @property @@ -115,9 +120,34 @@ class SecurityGroupServerRpcCallback(object): Note that sets are serialized into lists by rpc code. """ + # The original client RPC version was 1.2 before this change. + call_version = kwargs.pop("call_version", '1.2') + _target_version = versionutils.convert_version_to_tuple(call_version) devices_info = kwargs.get('devices') ports = self._get_devices_info(context, devices_info) - return self.plugin.security_group_info_for_ports(context, ports) + sg_info = self.plugin.security_group_info_for_ports(context, ports) + if _target_version < (1, 3): + LOG.warning("RPC security_group_info_for_devices call has " + "inconsistent version between server and agents. " + "The server supports RPC version is 1.3 while " + "the agent is %s.", call_version) + return self.make_compatible_sg_member_ips(sg_info) + return sg_info + + def make_compatible_sg_member_ips(self, sg_info): + sg_member_ips = sg_info.get('sg_member_ips', {}) + sg_ids = sg_member_ips.keys() + for sg_id in sg_ids: + member_ips = sg_member_ips.get(sg_id, {}) + ipv4_ips = member_ips.get("IPv4", set()) + comp_ipv4_ips = set([ip for ip, _mac in ipv4_ips]) + ipv6_ips = member_ips.get("IPv6", set()) + comp_ipv6_ips = set([ip for ip, _mac in ipv6_ips]) + comp_ips = {"IPv4": comp_ipv4_ips, + "IPv6": comp_ipv6_ips} + sg_member_ips[sg_id] = comp_ips + sg_info['sg_member_ips'] = sg_member_ips + return sg_info class SecurityGroupAgentRpcApiMixin(object): diff --git a/neutron/tests/unit/agent/test_securitygroups_rpc.py b/neutron/tests/unit/agent/test_securitygroups_rpc.py index 2696c98bec4..c5f2df50897 100644 --- a/neutron/tests/unit/agent/test_securitygroups_rpc.py +++ b/neutron/tests/unit/agent/test_securitygroups_rpc.py @@ -307,24 +307,48 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): allowed_address_pairs=address_pairs) yield self.deserialize(self.fmt, res1) - def test_security_group_info_for_devices_ipv4_addr_pair(self): + def _test_security_group_info_for_devices_ipv4_addr_pair( + self, call_version=None): with self._port_with_addr_pairs_and_security_group() as port: port_id = port['port']['id'] sg_id = port['port']['security_groups'][0] devices = [port_id, 'no_exist_device'] ctx = context.get_admin_context() # verify that address pairs are included in remote SG IPs - sg_member_ips = self.rpc.security_group_info_for_devices( - ctx, devices=devices)['sg_member_ips'] - expected_member_ips = [ - ('10.0.1.0/24', '00:00:00:00:00:01'), - ('11.0.0.1', '00:00:00:00:00:01'), - (port['port']['fixed_ips'][0]['ip_address'], - None)] + if call_version is not None: + sg_member_ips = self.rpc.security_group_info_for_devices( + ctx, devices=devices, + call_version=call_version)['sg_member_ips'] + else: + sg_member_ips = self.rpc.security_group_info_for_devices( + ctx, devices=devices)['sg_member_ips'] + if call_version == '1.3': + expected_member_ips = [ + ('10.0.1.0/24', '00:00:00:00:00:01'), + ('11.0.0.1', '00:00:00:00:00:01'), + (port['port']['fixed_ips'][0]['ip_address'], + None)] + else: + expected_member_ips = [ + '10.0.1.0/24', + '11.0.0.1', + port['port']['fixed_ips'][0]['ip_address']] self.assertEqual(sorted(expected_member_ips), sorted(sg_member_ips[sg_id]['IPv4'])) self._delete('ports', port_id) + def test_security_group_info_for_devices_ipv4_addr_pair(self): + self._test_security_group_info_for_devices_ipv4_addr_pair( + call_version='1.3') + + def test_security_group_info_for_devices_ipv4_addr_pair_low_version(self): + self._test_security_group_info_for_devices_ipv4_addr_pair( + call_version='1.2') + + def test_security_group_info_for_devices_ipv4_addr_pair_backward_cmp( + self): + self._test_security_group_info_for_devices_ipv4_addr_pair() + def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self): fake_prefix = FAKE_PREFIX[const.IPv4] with self._port_with_addr_pairs_and_security_group() as port: @@ -512,7 +536,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address'] ctx = context.get_admin_context() ports_rpc = self.rpc.security_group_info_for_devices( - ctx, devices=devices) + ctx, devices=devices, call_version='1.3') expected = { 'security_groups': {sg1_id: [ {'direction': 'egress', 'ethertype': const.IPv4,