Merge "Upgrade RPC version of SecurityGroup*Rpc"

This commit is contained in:
Zuul 2020-12-14 13:44:09 +00:00 committed by Gerrit Code Review
commit 69910a0bc5
2 changed files with 67 additions and 13 deletions

View File

@ -23,6 +23,7 @@ from neutron_lib import rpc as n_rpc
from neutron_lib.utils import net
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import versionutils
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
@ -56,9 +57,11 @@ class SecurityGroupServerRpcApi(object):
def security_group_info_for_devices(self, context, devices):
LOG.debug("Get security group information for devices via rpc %r",
devices)
cctxt = self.client.prepare(version='1.2')
call_version = '1.3'
cctxt = self.client.prepare(version=call_version)
return cctxt.call(context, 'security_group_info_for_devices',
devices=devices)
devices=devices,
call_version=call_version)
class SecurityGroupServerRpcCallback(object):
@ -72,10 +75,12 @@ class SecurityGroupServerRpcCallback(object):
# API version history:
# 1.1 - Initial version
# 1.2 - security_group_info_for_devices introduced as an optimization
# 1.3 - security_group_info_for_devices returns member_ips with new
# structure.
# NOTE: target must not be overridden in subclasses
# to keep RPC API version consistent across plugins.
target = oslo_messaging.Target(version='1.2',
target = oslo_messaging.Target(version='1.3',
namespace=constants.RPC_NAMESPACE_SECGROUP)
@property
@ -115,9 +120,34 @@ class SecurityGroupServerRpcCallback(object):
Note that sets are serialized into lists by rpc code.
"""
# The original client RPC version was 1.2 before this change.
call_version = kwargs.pop("call_version", '1.2')
_target_version = versionutils.convert_version_to_tuple(call_version)
devices_info = kwargs.get('devices')
ports = self._get_devices_info(context, devices_info)
return self.plugin.security_group_info_for_ports(context, ports)
sg_info = self.plugin.security_group_info_for_ports(context, ports)
if _target_version < (1, 3):
LOG.warning("RPC security_group_info_for_devices call has "
"inconsistent version between server and agents. "
"The server supports RPC version is 1.3 while "
"the agent is %s.", call_version)
return self.make_compatible_sg_member_ips(sg_info)
return sg_info
def make_compatible_sg_member_ips(self, sg_info):
sg_member_ips = sg_info.get('sg_member_ips', {})
sg_ids = sg_member_ips.keys()
for sg_id in sg_ids:
member_ips = sg_member_ips.get(sg_id, {})
ipv4_ips = member_ips.get("IPv4", set())
comp_ipv4_ips = set([ip for ip, _mac in ipv4_ips])
ipv6_ips = member_ips.get("IPv6", set())
comp_ipv6_ips = set([ip for ip, _mac in ipv6_ips])
comp_ips = {"IPv4": comp_ipv4_ips,
"IPv6": comp_ipv6_ips}
sg_member_ips[sg_id] = comp_ips
sg_info['sg_member_ips'] = sg_member_ips
return sg_info
class SecurityGroupAgentRpcApiMixin(object):

View File

@ -307,24 +307,48 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
allowed_address_pairs=address_pairs)
yield self.deserialize(self.fmt, res1)
def test_security_group_info_for_devices_ipv4_addr_pair(self):
def _test_security_group_info_for_devices_ipv4_addr_pair(
self, call_version=None):
with self._port_with_addr_pairs_and_security_group() as port:
port_id = port['port']['id']
sg_id = port['port']['security_groups'][0]
devices = [port_id, 'no_exist_device']
ctx = context.get_admin_context()
# verify that address pairs are included in remote SG IPs
if call_version is not None:
sg_member_ips = self.rpc.security_group_info_for_devices(
ctx, devices=devices,
call_version=call_version)['sg_member_ips']
else:
sg_member_ips = self.rpc.security_group_info_for_devices(
ctx, devices=devices)['sg_member_ips']
if call_version == '1.3':
expected_member_ips = [
('10.0.1.0/24', '00:00:00:00:00:01'),
('11.0.0.1', '00:00:00:00:00:01'),
(port['port']['fixed_ips'][0]['ip_address'],
None)]
else:
expected_member_ips = [
'10.0.1.0/24',
'11.0.0.1',
port['port']['fixed_ips'][0]['ip_address']]
self.assertEqual(sorted(expected_member_ips),
sorted(sg_member_ips[sg_id]['IPv4']))
self._delete('ports', port_id)
def test_security_group_info_for_devices_ipv4_addr_pair(self):
self._test_security_group_info_for_devices_ipv4_addr_pair(
call_version='1.3')
def test_security_group_info_for_devices_ipv4_addr_pair_low_version(self):
self._test_security_group_info_for_devices_ipv4_addr_pair(
call_version='1.2')
def test_security_group_info_for_devices_ipv4_addr_pair_backward_cmp(
self):
self._test_security_group_info_for_devices_ipv4_addr_pair()
def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self):
fake_prefix = FAKE_PREFIX[const.IPv4]
with self._port_with_addr_pairs_and_security_group() as port:
@ -512,7 +536,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_info_for_devices(
ctx, devices=devices)
ctx, devices=devices, call_version='1.3')
expected = {
'security_groups': {sg1_id: [
{'direction': 'egress', 'ethertype': const.IPv4,