Upgrade RPC version of SecurityGroup*Rpc
A partial upgrading of neutron cluster, neutron-server has a newer version while neutron-agents not, does not run well after a RPC data structure upgrading. This patch upgrades the security group related RPC version between neutron-server and agents. A partial upgrading neutron cluster will explicitly raise error. The RPC version should be aligned. Closes-bug: #1903531 Related-bug: #1867119 Change-Id: I6cb2ba05fa3337be46eb01f2d9f869efa41e4db6
This commit is contained in:
parent
cc6e3274b0
commit
510089bc5f
@ -23,6 +23,7 @@ from neutron_lib import rpc as n_rpc
|
|||||||
from neutron_lib.utils import net
|
from neutron_lib.utils import net
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
from oslo_utils import versionutils
|
||||||
|
|
||||||
from neutron.api.rpc.callbacks import resources
|
from neutron.api.rpc.callbacks import resources
|
||||||
from neutron.api.rpc.handlers import resources_rpc
|
from neutron.api.rpc.handlers import resources_rpc
|
||||||
@ -56,9 +57,11 @@ class SecurityGroupServerRpcApi(object):
|
|||||||
def security_group_info_for_devices(self, context, devices):
|
def security_group_info_for_devices(self, context, devices):
|
||||||
LOG.debug("Get security group information for devices via rpc %r",
|
LOG.debug("Get security group information for devices via rpc %r",
|
||||||
devices)
|
devices)
|
||||||
cctxt = self.client.prepare(version='1.2')
|
call_version = '1.3'
|
||||||
|
cctxt = self.client.prepare(version=call_version)
|
||||||
return cctxt.call(context, 'security_group_info_for_devices',
|
return cctxt.call(context, 'security_group_info_for_devices',
|
||||||
devices=devices)
|
devices=devices,
|
||||||
|
call_version=call_version)
|
||||||
|
|
||||||
|
|
||||||
class SecurityGroupServerRpcCallback(object):
|
class SecurityGroupServerRpcCallback(object):
|
||||||
@ -72,10 +75,12 @@ class SecurityGroupServerRpcCallback(object):
|
|||||||
# API version history:
|
# API version history:
|
||||||
# 1.1 - Initial version
|
# 1.1 - Initial version
|
||||||
# 1.2 - security_group_info_for_devices introduced as an optimization
|
# 1.2 - security_group_info_for_devices introduced as an optimization
|
||||||
|
# 1.3 - security_group_info_for_devices returns member_ips with new
|
||||||
|
# structure.
|
||||||
|
|
||||||
# NOTE: target must not be overridden in subclasses
|
# NOTE: target must not be overridden in subclasses
|
||||||
# to keep RPC API version consistent across plugins.
|
# to keep RPC API version consistent across plugins.
|
||||||
target = oslo_messaging.Target(version='1.2',
|
target = oslo_messaging.Target(version='1.3',
|
||||||
namespace=constants.RPC_NAMESPACE_SECGROUP)
|
namespace=constants.RPC_NAMESPACE_SECGROUP)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -115,9 +120,34 @@ class SecurityGroupServerRpcCallback(object):
|
|||||||
|
|
||||||
Note that sets are serialized into lists by rpc code.
|
Note that sets are serialized into lists by rpc code.
|
||||||
"""
|
"""
|
||||||
|
# The original client RPC version was 1.2 before this change.
|
||||||
|
call_version = kwargs.pop("call_version", '1.2')
|
||||||
|
_target_version = versionutils.convert_version_to_tuple(call_version)
|
||||||
devices_info = kwargs.get('devices')
|
devices_info = kwargs.get('devices')
|
||||||
ports = self._get_devices_info(context, devices_info)
|
ports = self._get_devices_info(context, devices_info)
|
||||||
return self.plugin.security_group_info_for_ports(context, ports)
|
sg_info = self.plugin.security_group_info_for_ports(context, ports)
|
||||||
|
if _target_version < (1, 3):
|
||||||
|
LOG.warning("RPC security_group_info_for_devices call has "
|
||||||
|
"inconsistent version between server and agents. "
|
||||||
|
"The server supports RPC version is 1.3 while "
|
||||||
|
"the agent is %s.", call_version)
|
||||||
|
return self.make_compatible_sg_member_ips(sg_info)
|
||||||
|
return sg_info
|
||||||
|
|
||||||
|
def make_compatible_sg_member_ips(self, sg_info):
|
||||||
|
sg_member_ips = sg_info.get('sg_member_ips', {})
|
||||||
|
sg_ids = sg_member_ips.keys()
|
||||||
|
for sg_id in sg_ids:
|
||||||
|
member_ips = sg_member_ips.get(sg_id, {})
|
||||||
|
ipv4_ips = member_ips.get("IPv4", set())
|
||||||
|
comp_ipv4_ips = set([ip for ip, _mac in ipv4_ips])
|
||||||
|
ipv6_ips = member_ips.get("IPv6", set())
|
||||||
|
comp_ipv6_ips = set([ip for ip, _mac in ipv6_ips])
|
||||||
|
comp_ips = {"IPv4": comp_ipv4_ips,
|
||||||
|
"IPv6": comp_ipv6_ips}
|
||||||
|
sg_member_ips[sg_id] = comp_ips
|
||||||
|
sg_info['sg_member_ips'] = sg_member_ips
|
||||||
|
return sg_info
|
||||||
|
|
||||||
|
|
||||||
class SecurityGroupAgentRpcApiMixin(object):
|
class SecurityGroupAgentRpcApiMixin(object):
|
||||||
|
@ -307,24 +307,48 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
|
|||||||
allowed_address_pairs=address_pairs)
|
allowed_address_pairs=address_pairs)
|
||||||
yield self.deserialize(self.fmt, res1)
|
yield self.deserialize(self.fmt, res1)
|
||||||
|
|
||||||
def test_security_group_info_for_devices_ipv4_addr_pair(self):
|
def _test_security_group_info_for_devices_ipv4_addr_pair(
|
||||||
|
self, call_version=None):
|
||||||
with self._port_with_addr_pairs_and_security_group() as port:
|
with self._port_with_addr_pairs_and_security_group() as port:
|
||||||
port_id = port['port']['id']
|
port_id = port['port']['id']
|
||||||
sg_id = port['port']['security_groups'][0]
|
sg_id = port['port']['security_groups'][0]
|
||||||
devices = [port_id, 'no_exist_device']
|
devices = [port_id, 'no_exist_device']
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
# verify that address pairs are included in remote SG IPs
|
# verify that address pairs are included in remote SG IPs
|
||||||
|
if call_version is not None:
|
||||||
|
sg_member_ips = self.rpc.security_group_info_for_devices(
|
||||||
|
ctx, devices=devices,
|
||||||
|
call_version=call_version)['sg_member_ips']
|
||||||
|
else:
|
||||||
sg_member_ips = self.rpc.security_group_info_for_devices(
|
sg_member_ips = self.rpc.security_group_info_for_devices(
|
||||||
ctx, devices=devices)['sg_member_ips']
|
ctx, devices=devices)['sg_member_ips']
|
||||||
|
if call_version == '1.3':
|
||||||
expected_member_ips = [
|
expected_member_ips = [
|
||||||
('10.0.1.0/24', '00:00:00:00:00:01'),
|
('10.0.1.0/24', '00:00:00:00:00:01'),
|
||||||
('11.0.0.1', '00:00:00:00:00:01'),
|
('11.0.0.1', '00:00:00:00:00:01'),
|
||||||
(port['port']['fixed_ips'][0]['ip_address'],
|
(port['port']['fixed_ips'][0]['ip_address'],
|
||||||
None)]
|
None)]
|
||||||
|
else:
|
||||||
|
expected_member_ips = [
|
||||||
|
'10.0.1.0/24',
|
||||||
|
'11.0.0.1',
|
||||||
|
port['port']['fixed_ips'][0]['ip_address']]
|
||||||
self.assertEqual(sorted(expected_member_ips),
|
self.assertEqual(sorted(expected_member_ips),
|
||||||
sorted(sg_member_ips[sg_id]['IPv4']))
|
sorted(sg_member_ips[sg_id]['IPv4']))
|
||||||
self._delete('ports', port_id)
|
self._delete('ports', port_id)
|
||||||
|
|
||||||
|
def test_security_group_info_for_devices_ipv4_addr_pair(self):
|
||||||
|
self._test_security_group_info_for_devices_ipv4_addr_pair(
|
||||||
|
call_version='1.3')
|
||||||
|
|
||||||
|
def test_security_group_info_for_devices_ipv4_addr_pair_low_version(self):
|
||||||
|
self._test_security_group_info_for_devices_ipv4_addr_pair(
|
||||||
|
call_version='1.2')
|
||||||
|
|
||||||
|
def test_security_group_info_for_devices_ipv4_addr_pair_backward_cmp(
|
||||||
|
self):
|
||||||
|
self._test_security_group_info_for_devices_ipv4_addr_pair()
|
||||||
|
|
||||||
def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self):
|
def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self):
|
||||||
fake_prefix = FAKE_PREFIX[const.IPv4]
|
fake_prefix = FAKE_PREFIX[const.IPv4]
|
||||||
with self._port_with_addr_pairs_and_security_group() as port:
|
with self._port_with_addr_pairs_and_security_group() as port:
|
||||||
@ -512,7 +536,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
|
|||||||
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
|
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
ports_rpc = self.rpc.security_group_info_for_devices(
|
ports_rpc = self.rpc.security_group_info_for_devices(
|
||||||
ctx, devices=devices)
|
ctx, devices=devices, call_version='1.3')
|
||||||
expected = {
|
expected = {
|
||||||
'security_groups': {sg1_id: [
|
'security_groups': {sg1_id: [
|
||||||
{'direction': 'egress', 'ethertype': const.IPv4,
|
{'direction': 'egress', 'ethertype': const.IPv4,
|
||||||
|
Loading…
Reference in New Issue
Block a user