Divide-and-conquer security group beasts
In one specific compute node, the security group rules can be enormous quantity. This patch adds a step-by-step processing method to deal with the large number of the security group rules. And also changes or adds some LOG. Related-Bug: #1813703 Related-Bug: #1813704 Related-Bug: #1813707 Conflicts: neutron/common/constants.py neutron/agent/common/ovs_lib.py Conflicts: neutron/agent/securitygroups_rpc.py neutron/common/constants.py neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py Conflicts: neutron/agent/common/ovs_lib.py neutron/common/constants.py Change-Id: I57bf27ec75cf848271c5a28b22beee12b8bd5faa (cherry picked from commit6ac420df7e
) (cherry picked from commitf5d110e15f
) (cherry picked from commit5424b9a68c
)
This commit is contained in:
parent
d9eccad736
commit
49df07c703
|
@ -31,6 +31,7 @@ from neutron._i18n import _, _LE, _LI, _LW
|
|||
from neutron.agent.common import ip_lib
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.ovsdb import api as ovsdb
|
||||
from neutron.common import constants as common_constants
|
||||
from neutron.conf.agent import ovs_conf
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
|
||||
|
@ -355,7 +356,11 @@ class OVSBridge(BaseOVS):
|
|||
if 'cookie' not in kw:
|
||||
kw['cookie'] = self._default_cookie
|
||||
flow_strs = [_build_flow_expr_str(kw, action) for kw in kwargs_list]
|
||||
self.run_ofctl('%s-flows' % action, ['-'], '\n'.join(flow_strs))
|
||||
LOG.debug("Processing %d OpenFlow rules.", len(flow_strs))
|
||||
step = common_constants.AGENT_RES_PROCESSING_STEP
|
||||
for i in range(0, len(flow_strs), step):
|
||||
self.run_ofctl('%s-flows' % action, ['-'],
|
||||
'\n'.join(flow_strs[i:i + step]))
|
||||
|
||||
def add_flow(self, **kwargs):
|
||||
self.do_action_flows('add', [kwargs])
|
||||
|
|
|
@ -24,6 +24,7 @@ import oslo_messaging
|
|||
from neutron._i18n import _LI, _LW
|
||||
from neutron.agent import firewall
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc
|
||||
from neutron.common import constants as common_constants
|
||||
from neutron.conf.agent import securitygroups_rpc as sc_cfg
|
||||
|
||||
|
||||
|
@ -139,12 +140,17 @@ class SecurityGroupAgentRpc(object):
|
|||
self._apply_port_filter(device_ids)
|
||||
|
||||
def _apply_port_filter(self, device_ids, update_filter=False):
|
||||
step = common_constants.AGENT_RES_PROCESSING_STEP
|
||||
if self.use_enhanced_rpc:
|
||||
devices_info = self.plugin_rpc.security_group_info_for_devices(
|
||||
self.context, list(device_ids))
|
||||
devices = devices_info['devices']
|
||||
security_groups = devices_info['security_groups']
|
||||
security_group_member_ips = devices_info['sg_member_ips']
|
||||
devices = {}
|
||||
security_groups = {}
|
||||
security_group_member_ips = {}
|
||||
for i in range(0, len(device_ids), step):
|
||||
devices_info = self.plugin_rpc.security_group_info_for_devices(
|
||||
self.context, list(device_ids)[i:i + step])
|
||||
devices.update(devices_info['devices'])
|
||||
security_groups.update(devices_info['security_groups'])
|
||||
security_group_member_ips.update(devices_info['sg_member_ips'])
|
||||
else:
|
||||
devices = self.plugin_rpc.security_group_rules_for_devices(
|
||||
self.context, list(device_ids))
|
||||
|
|
|
@ -230,3 +230,15 @@ PORT_BINDING_STATUS_ACTIVE = 'ACTIVE'
|
|||
PORT_BINDING_STATUS_INACTIVE = 'INACTIVE'
|
||||
PORT_BINDING_STATUSES = (PORT_BINDING_STATUS_ACTIVE,
|
||||
PORT_BINDING_STATUS_INACTIVE)
|
||||
|
||||
# Number of resources for neutron agent side functions to deal
|
||||
# with large sets.
|
||||
# Setting this value does not count on special conditions, it is just a human
|
||||
# countable or scalable number. [1] gives us the method to test the scale
|
||||
# issue. And we have tested the value of 1000, 500, 200, 100. But for 100,
|
||||
# ovs-agent will have a lower timeout probability. And according to the
|
||||
# testing result, step size 100 can indeed cost about 10% much more time
|
||||
# than 500/1000. But such extra time looks inevitably needed to be sacrificed
|
||||
# for the restart success rate.
|
||||
# [1] http://paste.openstack.org/show/745685/
|
||||
AGENT_RES_PROCESSING_STEP = 100
|
||||
|
|
|
@ -1628,8 +1628,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
port_info.get('updated', set()))
|
||||
need_binding_devices = []
|
||||
skipped_devices = set()
|
||||
start = time.time()
|
||||
if devices_added_updated:
|
||||
start = time.time()
|
||||
(skipped_devices, need_binding_devices,
|
||||
failed_devices['added']) = (
|
||||
self.treat_devices_added_or_updated(
|
||||
|
@ -1654,6 +1654,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self._add_port_tag_info(need_binding_devices)
|
||||
self.sg_agent.setup_port_filters(added_ports,
|
||||
port_info.get('updated', set()))
|
||||
LOG.info(_LI("process_network_ports - iteration:%(iter_num)d - "
|
||||
"agent port security group processed in %(elapsed).3f"),
|
||||
{'iter_num': self.iter_num,
|
||||
'elapsed': time.time() - start})
|
||||
failed_devices['added'] |= self._bind_devices(need_binding_devices)
|
||||
|
||||
if 'removed' in port_info and port_info['removed']:
|
||||
|
|
|
@ -1013,7 +1013,7 @@ class SecurityGroupAgentEnhancedRpcTestCase(
|
|||
mock.call.defer_apply(),
|
||||
mock.call.remove_port_filter(
|
||||
self.fake_device),
|
||||
])
|
||||
], any_order=True)
|
||||
|
||||
def test_security_groups_rule_updated_enhanced_rpc(self):
|
||||
sg_list = ['fake_sgid1', 'fake_sgid3']
|
||||
|
@ -1074,7 +1074,7 @@ class SecurityGroupAgentEnhancedRpcTestCase(
|
|||
'fake_sgid2', {'IPv4': [], 'IPv6': []}),
|
||||
mock.call.update_port_filter(self.fake_device)]
|
||||
|
||||
self.firewall.assert_has_calls(calls)
|
||||
self.firewall.assert_has_calls(calls, any_order=True)
|
||||
|
||||
def test_refresh_firewall_devices_enhanced_rpc(self):
|
||||
self.agent.prepare_devices_filter(['fake_device'])
|
||||
|
@ -1095,7 +1095,7 @@ class SecurityGroupAgentEnhancedRpcTestCase(
|
|||
'IPv4': [], 'IPv6': []}),
|
||||
mock.call.update_port_filter(self.fake_device)
|
||||
]
|
||||
self.firewall.assert_has_calls(calls)
|
||||
self.firewall.assert_has_calls(calls, any_order=True)
|
||||
|
||||
def test_refresh_firewall_none_enhanced_rpc(self):
|
||||
self.agent.refresh_firewall([])
|
||||
|
|
Loading…
Reference in New Issue