diff --git a/neutron/agent/l3/extensions/qos/base.py b/neutron/agent/l3/extensions/qos/base.py index 372ddac6eb0..f1f3f15f87e 100644 --- a/neutron/agent/l3/extensions/qos/base.py +++ b/neutron/agent/l3/extensions/qos/base.py @@ -19,6 +19,7 @@ from neutron_lib import constants from neutron_lib.db import constants as db_consts from neutron_lib import rpc as n_rpc from neutron_lib.services.qos import constants as qos_consts +from oslo_concurrency import lockutils from oslo_log import log as logging from neutron.agent.linux import l3_tc_lib as tc_lib @@ -51,23 +52,44 @@ IP_DEFAULT_BURST = 0 class RateLimitMaps(object): - def __init__(self): + def __init__(self, lock_name): self.qos_policy_resources = collections.defaultdict(dict) self.known_policies = {} self.resource_policies = {} + self.lock_name = lock_name def update_policy(self, policy): - self.known_policies[policy.id] = policy + + @lockutils.synchronized(self.lock_name) + def _update_policy(): + self.known_policies[policy.id] = policy + + return _update_policy() def get_policy(self, policy_id): - return self.known_policies.get(policy_id) + + @lockutils.synchronized(self.lock_name) + def _get_policy(): + return self.known_policies.get(policy_id) + + return _get_policy() def get_resources(self, policy): - return self.qos_policy_resources[policy.id].values() + + @lockutils.synchronized(self.lock_name) + def _get_resources(): + return self.qos_policy_resources[policy.id].values() + + return _get_resources() def get_resource_policy(self, resource): - policy_id = self.resource_policies.get(resource) - return self.get_policy(policy_id) + + @lockutils.synchronized(self.lock_name) + def _get_resource_policy(): + policy_id = self.resource_policies.get(resource) + return self.known_policies.get(policy_id) + + return _get_resource_policy() def set_resource_policy(self, resource, policy): """Attach a resource to policy @@ -75,12 +97,17 @@ class RateLimitMaps(object): and return any previous policy on resource. """ - old_policy = self.get_resource_policy(resource) - self.update_policy(policy) - self.resource_policies[resource] = policy.id - self.qos_policy_resources[policy.id][resource] = resource - if old_policy and old_policy.id != policy.id: - del self.qos_policy_resources[old_policy.id][resource] + @lockutils.synchronized(self.lock_name) + def _set_resource_policy(): + policy_id = self.resource_policies.get(resource) + old_policy = self.known_policies.get(policy_id) + self.known_policies[policy.id] = policy + self.resource_policies[resource] = policy.id + self.qos_policy_resources[policy.id][resource] = resource + if old_policy and old_policy.id != policy.id: + del self.qos_policy_resources[old_policy.id][resource] + + _set_resource_policy() def clean_by_resource(self, resource): """Detach resource from policy @@ -88,16 +115,21 @@ class RateLimitMaps(object): and cleanup data we don't need anymore. """ - if resource in self.resource_policies: - del self.resource_policies[resource] - for qos_policy_id, res_dict in self.qos_policy_resources.items(): - if resource in res_dict: - del res_dict[resource] - if not res_dict: - self._clean_policy_info(qos_policy_id) - return - LOG.debug("L3 QoS extension did not have " - "information on floating IP %s", resource) + @lockutils.synchronized(self.lock_name) + def _clean_by_resource(): + if resource in self.resource_policies: + del self.resource_policies[resource] + for (qos_policy_id, + res_dict) in self.qos_policy_resources.items(): + if resource in res_dict: + del res_dict[resource] + if not res_dict: + self._clean_policy_info(qos_policy_id) + return + LOG.debug("L3 QoS extension did not have " + "information on floating IP %s", resource) + + _clean_by_resource() def _clean_policy_info(self, qos_policy_id): del self.qos_policy_resources[qos_policy_id] diff --git a/neutron/agent/l3/extensions/qos/fip.py b/neutron/agent/l3/extensions/qos/fip.py index e9b6d6f96f7..f72fbcceb7f 100644 --- a/neutron/agent/l3/extensions/qos/fip.py +++ b/neutron/agent/l3/extensions/qos/fip.py @@ -24,11 +24,14 @@ from neutron.agent.linux import ip_lib from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import coordination LOG = logging.getLogger(__name__) class RouterFipRateLimitMaps(qos_base.RateLimitMaps): + LOCK_NAME = "fip-qos-cache" + def __init__(self): """Initialize RouterFipRateLimitMaps @@ -51,12 +54,58 @@ class RouterFipRateLimitMaps(qos_base.RateLimitMaps): """ self.ingress_ratelimits = {} self.egress_ratelimits = {} - super(RouterFipRateLimitMaps, self).__init__() + super(RouterFipRateLimitMaps, self).__init__(self.LOCK_NAME) def find_fip_router_id(self, fip): - for router_id, ips in self.router_floating_ips.items(): - if fip in ips: - return router_id + + @lockutils.synchronized(self.lock_name) + def _find_fip_router_id(): + for router_id, ips in self.router_floating_ips.items(): + if fip in ips: + return router_id + + return _find_fip_router_id() + + def get_router_floating_ips(self, router_id): + + @lockutils.synchronized(self.lock_name) + def _get_router_floating_ips(): + return self.router_floating_ips.pop( + router_id, []) + + return _get_router_floating_ips() + + def remove_fip_ratelimit_cache(self, direction, fip): + + @lockutils.synchronized(self.lock_name) + def _remove_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate_limits.pop(fip, None) + + _remove_fip_ratelimit_cache() + + def set_fip_ratelimit_cache(self, direction, fip, rate, burst): + + @lockutils.synchronized(self.lock_name) + def _set_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate_limits[fip] = (rate, burst) + + _set_fip_ratelimit_cache() + + def get_fip_ratelimit_cache(self, direction, fip): + + @lockutils.synchronized(self.lock_name) + def _get_fip_ratelimit_cache(): + rate_limits_direction = direction + "_ratelimits" + rate_limits = getattr(self, rate_limits_direction, {}) + rate, burst = rate_limits.get(fip, (qos_base.IP_DEFAULT_RATE, + qos_base.IP_DEFAULT_BURST)) + return rate, burst + + return _get_fip_ratelimit_cache() class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, @@ -68,7 +117,6 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, self.fip_qos_map = RouterFipRateLimitMaps() self._register_rpc_consumers() - @lockutils.synchronized('qos-fip') def _handle_notification(self, context, resource_type, qos_policies, event_type): if event_type == events.UPDATED: @@ -98,20 +146,16 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, fip, dvr_fip_device, rates, with_cache=False) self.fip_qos_map.update_policy(qos_policy) + def _remove_fip_rate_limit_cache(self, fip): + for direction in constants.VALID_DIRECTIONS: + self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip) + def _process_reset_fip(self, fip): self.fip_qos_map.clean_by_resource(fip) - def process_ip_rate_limit(self, ip, direction, device, rate, burst): - rate_limits_direction = direction + "_ratelimits" - rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {}) - old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE, - qos_base.IP_DEFAULT_BURST)) - - if old_rate == rate and old_burst == burst: - # Two possibilities here: - # 1. Floating IP rate limit does not change. - # 2. Floating IP bandwidth does not limit. - return + @coordination.synchronized('qos-floating-ip-{ip}') + def process_ip_rate_limit(self, ip, direction, + device, rate, burst): tc_wrapper = self._get_tc_wrapper(device) @@ -121,12 +165,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, # floating IP bandwidth was changed to default value (no limit). # NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound. tc_wrapper.clear_ip_rate_limit(direction, ip) - rate_limits.pop(ip, None) + self.fip_qos_map.remove_fip_ratelimit_cache(direction, ip) return # Finally just set it, l3_tc_lib will clean the old rules if exists. tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst) - rate_limits[ip] = (rate, burst) def _get_rate_limit_ip_device(self, router_info): ex_gw_port = router_info.get_ex_gw_port() @@ -152,17 +195,12 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, namespace = router_info.get_gw_ns_name() return ip_lib.IPDevice(name, namespace=namespace) - def _remove_ip_rate_limit_cache(self, ip, direction): - rate_limits_direction = direction + "_ratelimits" - rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {}) - rate_limits.pop(ip, None) - def _remove_fip_rate_limit(self, device, fip_ip): tc_wrapper = self._get_tc_wrapper(device) for direction in constants.VALID_DIRECTIONS: if device.exists(): tc_wrapper.clear_ip_rate_limit(direction, fip_ip) - self._remove_ip_rate_limit_cache(fip_ip, direction) + self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip_ip) def get_fip_qos_rates(self, context, fip, policy_id): if policy_id is None: @@ -184,9 +222,21 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, for direction in constants.VALID_DIRECTIONS: rate = rates.get(direction) if with_cache: + + old_rate, old_burst = self.fip_qos_map.get_fip_ratelimit_cache( + direction, fip) + if old_rate == rate['rate'] and old_burst == rate['burst']: + # Two possibilities here: + # 1. Floating IP rate limit does not change. + # 2. Floating IP bandwidth does not limit. + continue + self.process_ip_rate_limit( fip, direction, device, rate['rate'], rate['burst']) + + self.fip_qos_map.set_fip_ratelimit_cache( + direction, fip, rate['rate'], rate['burst']) else: tc_wrapper = self._get_tc_wrapper(device) if (rate['rate'] == qos_base.IP_DEFAULT_RATE and @@ -280,13 +330,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase, self._remove_fip_rate_limit(dvr_fip_device, fip) self._process_reset_fip(fip) - @lockutils.synchronized('qos-fip') def add_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: self.process_floating_ip_addresses(context, router_info) - @lockutils.synchronized('qos-fip') def update_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: diff --git a/neutron/agent/l3/extensions/qos/gateway_ip.py b/neutron/agent/l3/extensions/qos/gateway_ip.py index d8abbc57ece..316127d3657 100644 --- a/neutron/agent/l3/extensions/qos/gateway_ip.py +++ b/neutron/agent/l3/extensions/qos/gateway_ip.py @@ -17,7 +17,6 @@ import netaddr from neutron_lib.agent import l3_extension from neutron_lib import constants -from oslo_concurrency import lockutils from oslo_log import log as logging @@ -26,6 +25,7 @@ from neutron.agent.linux import ip_lib from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import coordination LOG = logging.getLogger(__name__) @@ -37,9 +37,9 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, """Initialize agent extension.""" self.resource_rpc = resources_rpc.ResourcesPullRpcApi() self._register_rpc_consumers() - self.gateway_ip_qos_map = qos_base.RateLimitMaps() + self.gateway_ip_qos_map = qos_base.RateLimitMaps( + "gateway-ip-qos-cache") - @lockutils.synchronized('qos-gateway-ip') def _handle_notification(self, context, resource_type, qos_policies, event_type): if event_type == events.UPDATED: @@ -81,13 +81,11 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, router_id, qos_policy) self.gateway_ip_qos_map.update_policy(qos_policy) - @lockutils.synchronized('qos-gateway-ip') def add_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: self.process_gateway_rate_limit(context, router_info) - @lockutils.synchronized('qos-gateway-ip') def update_router(self, context, data): router_info = self._get_router_info(data['id']) if router_info: @@ -120,6 +118,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, self._handle_router_gateway_rate_limit(context, router_info) + @coordination.synchronized('qos-gateway-ip-{router_info.router_id}') def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper): self.gateway_ip_qos_map.clean_by_resource(router_info.router_id) for ip in router_info.qos_gateway_ips: @@ -172,6 +171,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase, router_info.router_id, policy) return self.get_policy_rates(policy) + @coordination.synchronized('qos-gateway-ip-{router_info.router_id}') def _set_gateway_tc_rules(self, router_info, tc_wrapper, ex_gw_port, rates): for ip_addr in ex_gw_port['fixed_ips']: diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_base.py b/neutron/tests/unit/agent/l3/extensions/qos/test_base.py index 5c626cfaf8a..e43fafb9970 100644 --- a/neutron/tests/unit/agent/l3/extensions/qos/test_base.py +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_base.py @@ -34,7 +34,7 @@ class RateLimitMapsTestCase(base.BaseTestCase): def setUp(self): super(RateLimitMapsTestCase, self).setUp() - self.policy_map = qos_base.RateLimitMaps() + self.policy_map = qos_base.RateLimitMaps("cache-lock") def test_update_policy(self): self.policy_map.update_policy(TEST_POLICY) diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py index 09ee853e90f..2def99ead6b 100644 --- a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py @@ -384,3 +384,37 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase): self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8")) self.assertEqual(router_id, self.policy_map.find_fip_router_id(TEST_FIP)) + + def test_get_router_floating_ips(self): + router_id = _uuid() + test_ips = [TEST_FIP, TEST_FIP2] + self.policy_map.router_floating_ips[router_id] = set([TEST_FIP, + TEST_FIP2]) + get_ips = self.policy_map.get_router_floating_ips(router_id) + self.assertEqual(len(test_ips), len(get_ips)) + + def test_remove_fip_ratelimit_cache(self): + fip = "1.1.1.1" + self.policy_map.set_fip_ratelimit_cache( + "ingress", fip, 100, 200) + self.policy_map.set_fip_ratelimit_cache( + "egress", fip, 100, 200) + self.policy_map.remove_fip_ratelimit_cache("ingress", fip) + self.assertIsNone(self.policy_map.ingress_ratelimits.get(fip)) + self.policy_map.remove_fip_ratelimit_cache("egress", fip) + self.assertIsNone(self.policy_map.egress_ratelimits.get(fip)) + + def test_set_fip_ratelimit_cache(self): + fip = "1.1.1.1" + self.policy_map.set_fip_ratelimit_cache( + "ingress", fip, 100, 200) + self.policy_map.set_fip_ratelimit_cache( + "egress", fip, 300, 400) + in_rate, in_burst = self.policy_map.get_fip_ratelimit_cache( + "ingress", fip) + self.assertEqual(100, in_rate) + self.assertEqual(200, in_burst) + e_rate, e_burst = self.policy_map.get_fip_ratelimit_cache( + "egress", fip) + self.assertEqual(300, e_rate) + self.assertEqual(400, e_burst) diff --git a/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml b/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml new file mode 100644 index 00000000000..b76c6d22803 --- /dev/null +++ b/releasenotes/notes/Minimizing-lock-granularity-8bc2f893d9389cf8.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Leverage the coordination lock to the resource processing + and notification thread functions to minimize the lock + granularity.