Merge "Minimizing L3 agent QoS extensions lock granularity"

This commit is contained in:
Zuul 2019-07-08 11:21:45 +00:00 committed by Gerrit Code Review
commit e9dce50362
6 changed files with 174 additions and 54 deletions

View File

@ -19,6 +19,7 @@ from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import l3_tc_lib as tc_lib
@ -51,23 +52,44 @@ IP_DEFAULT_BURST = 0
class RateLimitMaps(object):
def __init__(self):
def __init__(self, lock_name):
self.qos_policy_resources = collections.defaultdict(dict)
self.known_policies = {}
self.resource_policies = {}
self.lock_name = lock_name
def update_policy(self, policy):
self.known_policies[policy.id] = policy
@lockutils.synchronized(self.lock_name)
def _update_policy():
self.known_policies[policy.id] = policy
return _update_policy()
def get_policy(self, policy_id):
return self.known_policies.get(policy_id)
@lockutils.synchronized(self.lock_name)
def _get_policy():
return self.known_policies.get(policy_id)
return _get_policy()
def get_resources(self, policy):
return self.qos_policy_resources[policy.id].values()
@lockutils.synchronized(self.lock_name)
def _get_resources():
return self.qos_policy_resources[policy.id].values()
return _get_resources()
def get_resource_policy(self, resource):
policy_id = self.resource_policies.get(resource)
return self.get_policy(policy_id)
@lockutils.synchronized(self.lock_name)
def _get_resource_policy():
policy_id = self.resource_policies.get(resource)
return self.known_policies.get(policy_id)
return _get_resource_policy()
def set_resource_policy(self, resource, policy):
"""Attach a resource to policy
@ -75,12 +97,17 @@ class RateLimitMaps(object):
and return any previous policy on resource.
"""
old_policy = self.get_resource_policy(resource)
self.update_policy(policy)
self.resource_policies[resource] = policy.id
self.qos_policy_resources[policy.id][resource] = resource
if old_policy and old_policy.id != policy.id:
del self.qos_policy_resources[old_policy.id][resource]
@lockutils.synchronized(self.lock_name)
def _set_resource_policy():
policy_id = self.resource_policies.get(resource)
old_policy = self.known_policies.get(policy_id)
self.known_policies[policy.id] = policy
self.resource_policies[resource] = policy.id
self.qos_policy_resources[policy.id][resource] = resource
if old_policy and old_policy.id != policy.id:
del self.qos_policy_resources[old_policy.id][resource]
_set_resource_policy()
def clean_by_resource(self, resource):
"""Detach resource from policy
@ -88,16 +115,21 @@ class RateLimitMaps(object):
and cleanup data we don't need anymore.
"""
if resource in self.resource_policies:
del self.resource_policies[resource]
for qos_policy_id, res_dict in self.qos_policy_resources.items():
if resource in res_dict:
del res_dict[resource]
if not res_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("L3 QoS extension did not have "
"information on floating IP %s", resource)
@lockutils.synchronized(self.lock_name)
def _clean_by_resource():
if resource in self.resource_policies:
del self.resource_policies[resource]
for (qos_policy_id,
res_dict) in self.qos_policy_resources.items():
if resource in res_dict:
del res_dict[resource]
if not res_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("L3 QoS extension did not have "
"information on floating IP %s", resource)
_clean_by_resource()
def _clean_policy_info(self, qos_policy_id):
del self.qos_policy_resources[qos_policy_id]

View File

@ -24,11 +24,14 @@ from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import coordination
LOG = logging.getLogger(__name__)
class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
LOCK_NAME = "fip-qos-cache"
def __init__(self):
"""Initialize RouterFipRateLimitMaps
@ -51,12 +54,58 @@ class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
"""
self.ingress_ratelimits = {}
self.egress_ratelimits = {}
super(RouterFipRateLimitMaps, self).__init__()
super(RouterFipRateLimitMaps, self).__init__(self.LOCK_NAME)
def find_fip_router_id(self, fip):
for router_id, ips in self.router_floating_ips.items():
if fip in ips:
return router_id
@lockutils.synchronized(self.lock_name)
def _find_fip_router_id():
for router_id, ips in self.router_floating_ips.items():
if fip in ips:
return router_id
return _find_fip_router_id()
def get_router_floating_ips(self, router_id):
@lockutils.synchronized(self.lock_name)
def _get_router_floating_ips():
return self.router_floating_ips.pop(
router_id, [])
return _get_router_floating_ips()
def remove_fip_ratelimit_cache(self, direction, fip):
@lockutils.synchronized(self.lock_name)
def _remove_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate_limits.pop(fip, None)
_remove_fip_ratelimit_cache()
def set_fip_ratelimit_cache(self, direction, fip, rate, burst):
@lockutils.synchronized(self.lock_name)
def _set_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate_limits[fip] = (rate, burst)
_set_fip_ratelimit_cache()
def get_fip_ratelimit_cache(self, direction, fip):
@lockutils.synchronized(self.lock_name)
def _get_fip_ratelimit_cache():
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self, rate_limits_direction, {})
rate, burst = rate_limits.get(fip, (qos_base.IP_DEFAULT_RATE,
qos_base.IP_DEFAULT_BURST))
return rate, burst
return _get_fip_ratelimit_cache()
class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
@ -68,7 +117,6 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self.fip_qos_map = RouterFipRateLimitMaps()
self._register_rpc_consumers()
@lockutils.synchronized('qos-fip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
if event_type == events.UPDATED:
@ -98,20 +146,16 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
fip, dvr_fip_device, rates, with_cache=False)
self.fip_qos_map.update_policy(qos_policy)
def _remove_fip_rate_limit_cache(self, fip):
for direction in constants.VALID_DIRECTIONS:
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip)
def _process_reset_fip(self, fip):
self.fip_qos_map.clean_by_resource(fip)
def process_ip_rate_limit(self, ip, direction, device, rate, burst):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE,
qos_base.IP_DEFAULT_BURST))
if old_rate == rate and old_burst == burst:
# Two possibilities here:
# 1. Floating IP rate limit does not change.
# 2. Floating IP bandwidth does not limit.
return
@coordination.synchronized('qos-floating-ip-{ip}')
def process_ip_rate_limit(self, ip, direction,
device, rate, burst):
tc_wrapper = self._get_tc_wrapper(device)
@ -121,12 +165,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
# floating IP bandwidth was changed to default value (no limit).
# NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound.
tc_wrapper.clear_ip_rate_limit(direction, ip)
rate_limits.pop(ip, None)
self.fip_qos_map.remove_fip_ratelimit_cache(direction, ip)
return
# Finally just set it, l3_tc_lib will clean the old rules if exists.
tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst)
rate_limits[ip] = (rate, burst)
def _get_rate_limit_ip_device(self, router_info):
ex_gw_port = router_info.get_ex_gw_port()
@ -152,17 +195,12 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
namespace = router_info.get_gw_ns_name()
return ip_lib.IPDevice(name, namespace=namespace)
def _remove_ip_rate_limit_cache(self, ip, direction):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
rate_limits.pop(ip, None)
def _remove_fip_rate_limit(self, device, fip_ip):
tc_wrapper = self._get_tc_wrapper(device)
for direction in constants.VALID_DIRECTIONS:
if device.exists():
tc_wrapper.clear_ip_rate_limit(direction, fip_ip)
self._remove_ip_rate_limit_cache(fip_ip, direction)
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip_ip)
def get_fip_qos_rates(self, context, fip, policy_id):
if policy_id is None:
@ -184,9 +222,21 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
for direction in constants.VALID_DIRECTIONS:
rate = rates.get(direction)
if with_cache:
old_rate, old_burst = self.fip_qos_map.get_fip_ratelimit_cache(
direction, fip)
if old_rate == rate['rate'] and old_burst == rate['burst']:
# Two possibilities here:
# 1. Floating IP rate limit does not change.
# 2. Floating IP bandwidth does not limit.
continue
self.process_ip_rate_limit(
fip, direction, device,
rate['rate'], rate['burst'])
self.fip_qos_map.set_fip_ratelimit_cache(
direction, fip, rate['rate'], rate['burst'])
else:
tc_wrapper = self._get_tc_wrapper(device)
if (rate['rate'] == qos_base.IP_DEFAULT_RATE and
@ -280,13 +330,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self._remove_fip_rate_limit(dvr_fip_device, fip)
self._process_reset_fip(fip)
@lockutils.synchronized('qos-fip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_floating_ip_addresses(context, router_info)
@lockutils.synchronized('qos-fip')
def update_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:

View File

@ -17,7 +17,6 @@ import netaddr
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from oslo_concurrency import lockutils
from oslo_log import log as logging
@ -26,6 +25,7 @@ from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import coordination
LOG = logging.getLogger(__name__)
@ -37,9 +37,9 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
"""Initialize agent extension."""
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers()
self.gateway_ip_qos_map = qos_base.RateLimitMaps()
self.gateway_ip_qos_map = qos_base.RateLimitMaps(
"gateway-ip-qos-cache")
@lockutils.synchronized('qos-gateway-ip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
if event_type == events.UPDATED:
@ -81,13 +81,11 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
router_id, qos_policy)
self.gateway_ip_qos_map.update_policy(qos_policy)
@lockutils.synchronized('qos-gateway-ip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_gateway_rate_limit(context, router_info)
@lockutils.synchronized('qos-gateway-ip')
def update_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
@ -120,6 +118,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
self._handle_router_gateway_rate_limit(context, router_info)
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper):
self.gateway_ip_qos_map.clean_by_resource(router_info.router_id)
for ip in router_info.qos_gateway_ips:
@ -172,6 +171,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
router_info.router_id, policy)
return self.get_policy_rates(policy)
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
def _set_gateway_tc_rules(self, router_info, tc_wrapper,
ex_gw_port, rates):
for ip_addr in ex_gw_port['fixed_ips']:

View File

@ -34,7 +34,7 @@ class RateLimitMapsTestCase(base.BaseTestCase):
def setUp(self):
super(RateLimitMapsTestCase, self).setUp()
self.policy_map = qos_base.RateLimitMaps()
self.policy_map = qos_base.RateLimitMaps("cache-lock")
def test_update_policy(self):
self.policy_map.update_policy(TEST_POLICY)

View File

@ -384,3 +384,37 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase):
self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8"))
self.assertEqual(router_id,
self.policy_map.find_fip_router_id(TEST_FIP))
def test_get_router_floating_ips(self):
router_id = _uuid()
test_ips = [TEST_FIP, TEST_FIP2]
self.policy_map.router_floating_ips[router_id] = set([TEST_FIP,
TEST_FIP2])
get_ips = self.policy_map.get_router_floating_ips(router_id)
self.assertEqual(len(test_ips), len(get_ips))
def test_remove_fip_ratelimit_cache(self):
fip = "1.1.1.1"
self.policy_map.set_fip_ratelimit_cache(
"ingress", fip, 100, 200)
self.policy_map.set_fip_ratelimit_cache(
"egress", fip, 100, 200)
self.policy_map.remove_fip_ratelimit_cache("ingress", fip)
self.assertIsNone(self.policy_map.ingress_ratelimits.get(fip))
self.policy_map.remove_fip_ratelimit_cache("egress", fip)
self.assertIsNone(self.policy_map.egress_ratelimits.get(fip))
def test_set_fip_ratelimit_cache(self):
fip = "1.1.1.1"
self.policy_map.set_fip_ratelimit_cache(
"ingress", fip, 100, 200)
self.policy_map.set_fip_ratelimit_cache(
"egress", fip, 300, 400)
in_rate, in_burst = self.policy_map.get_fip_ratelimit_cache(
"ingress", fip)
self.assertEqual(100, in_rate)
self.assertEqual(200, in_burst)
e_rate, e_burst = self.policy_map.get_fip_ratelimit_cache(
"egress", fip)
self.assertEqual(300, e_rate)
self.assertEqual(400, e_burst)

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Leverage the coordination lock to the resource processing
and notification thread functions to minimize the lock
granularity.