From b0cdbc84abc268cdca8c330e168abc891c12aafe Mon Sep 17 00:00:00 2001 From: Carl Baldwin Date: Mon, 1 Dec 2014 16:49:10 -0500 Subject: [PATCH] Move classes out of l3_agent.py The file l3_agent.py has become too large. This patch is a simple pure refactor to move some of the functionality in to other files where things aren't too tangled up. There is no functional change with this patch and I avoided gratuitous other fixups in this patch in order to make it easier to review. I plan to follow up on the new l3_dvr and l3_agent_router modules with more restructuring in the near future. Partially-Implements: bp restructure-l3-agent Change-Id: I3529fe4146c50c940f41eb26d0b5efc5870b3af9 --- neutron/agent/l3/__init__.py | 0 neutron/agent/{l3_agent.py => l3/agent.py} | 346 ++---------------- neutron/agent/{l3_ha_agent.py => l3/ha.py} | 0 neutron/agent/l3/link_local_allocator.py | 109 ++++++ neutron/agent/l3/router_info.py | 72 ++++ neutron/agent/l3/router_processing_queue.py | 162 ++++++++ neutron/agent/netns_cleanup_util.py | 2 +- neutron/agent/ovs_cleanup_util.py | 4 +- neutron/tests/common/agents/l3_agent.py | 4 +- .../tests/functional/agent/test_l3_agent.py | 4 +- neutron/tests/unit/test_l3_agent.py | 270 +++----------- neutron/tests/unit/test_l3_dvr.py | 96 +++++ .../unit/test_router_processing_queue.py | 102 ++++++ setup.cfg | 2 +- 14 files changed, 637 insertions(+), 536 deletions(-) create mode 100644 neutron/agent/l3/__init__.py rename neutron/agent/{l3_agent.py => l3/agent.py} (86%) rename neutron/agent/{l3_ha_agent.py => l3/ha.py} (100%) create mode 100644 neutron/agent/l3/link_local_allocator.py create mode 100644 neutron/agent/l3/router_info.py create mode 100644 neutron/agent/l3/router_processing_queue.py create mode 100644 neutron/tests/unit/test_l3_dvr.py create mode 100644 neutron/tests/unit/test_router_processing_queue.py diff --git a/neutron/agent/l3/__init__.py b/neutron/agent/l3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/neutron/agent/l3_agent.py b/neutron/agent/l3/agent.py similarity index 86% rename from neutron/agent/l3_agent.py rename to neutron/agent/l3/agent.py index acd4d67b2c..9ddf430f92 100644 --- a/neutron/agent/l3_agent.py +++ b/neutron/agent/l3/agent.py @@ -15,7 +15,6 @@ import sys -import datetime import eventlet eventlet.monkey_patch() @@ -26,10 +25,12 @@ from oslo import messaging from oslo.utils import excutils from oslo.utils import importutils from oslo.utils import timeutils -import Queue from neutron.agent.common import config -from neutron.agent import l3_ha_agent +from neutron.agent.l3 import ha +from neutron.agent.l3 import link_local_allocator as lla +from neutron.agent.l3 import router_info +from neutron.agent.l3 import router_processing_queue as queue from neutron.agent.linux import external_process from neutron.agent.linux import interface from neutron.agent.linux import ip_lib @@ -77,10 +78,6 @@ FIP_PR_START = 32768 FIP_PR_END = FIP_PR_START + 40000 RPC_LOOP_INTERVAL = 1 FLOATING_IP_CIDR_SUFFIX = '/32' -# Lower value is higher priority -PRIORITY_RPC = 0 -PRIORITY_SYNC_ROUTERS_TASK = 1 -DELETE_ROUTER = 1 class L3PluginApi(object): @@ -142,295 +139,8 @@ class L3PluginApi(object): return cctxt.call(context, 'get_service_plugin_list') -class LinkLocalAddressPair(netaddr.IPNetwork): - def __init__(self, addr): - super(LinkLocalAddressPair, self).__init__(addr) - - def get_pair(self): - """Builds an address pair from the first and last addresses. """ - return (netaddr.IPNetwork("%s/%s" % (self.network, self.prefixlen)), - netaddr.IPNetwork("%s/%s" % (self.broadcast, self.prefixlen))) - - -class LinkLocalAllocator(object): - """Manages allocation of link local IP addresses. - - These link local addresses are used for routing inside the fip namespaces. - The associations need to persist across agent restarts to maintain - consistency. Without this, there is disruption in network connectivity - as the agent rewires the connections with the new IP address assocations. - - Persisting these in the database is unnecessary and would degrade - performance. - """ - def __init__(self, state_file, subnet): - """Read the file with previous allocations recorded. - - See the note in the allocate method for more detail. - """ - self.state_file = state_file - subnet = netaddr.IPNetwork(subnet) - - self.allocations = {} - - self.remembered = {} - for line in self._read(): - key, cidr = line.strip().split(',') - self.remembered[key] = LinkLocalAddressPair(cidr) - - self.pool = set(LinkLocalAddressPair(s) for s in subnet.subnet(31)) - self.pool.difference_update(self.remembered.values()) - - def allocate(self, key): - """Try to allocate a link local address pair. - - I expect this to work in all cases because I expect the pool size to be - large enough for any situation. Nonetheless, there is some defensive - programming in here. - - Since the allocations are persisted, there is the chance to leak - allocations which should have been released but were not. This leak - could eventually exhaust the pool. - - So, if a new allocation is needed, the code first checks to see if - there are any remembered allocations for the key. If not, it checks - the free pool. If the free pool is empty then it dumps the remembered - allocations to free the pool. This final desparate step will not - happen often in practice. - """ - if key in self.remembered: - self.allocations[key] = self.remembered.pop(key) - return self.allocations[key] - - if not self.pool: - # Desparate times. Try to get more in the pool. - self.pool.update(self.remembered.values()) - self.remembered.clear() - if not self.pool: - # More than 256 routers on a compute node! - raise RuntimeError(_("Cannot allocate link local address")) - - self.allocations[key] = self.pool.pop() - self._write_allocations() - return self.allocations[key] - - def release(self, key): - self.pool.add(self.allocations.pop(key)) - self._write_allocations() - - def _write_allocations(self): - current = ["%s,%s\n" % (k, v) for k, v in self.allocations.items()] - remembered = ["%s,%s\n" % (k, v) for k, v in self.remembered.items()] - current.extend(remembered) - self._write(current) - - def _write(self, lines): - with open(self.state_file, "w") as f: - f.writelines(lines) - - def _read(self): - if not os.path.exists(self.state_file): - return [] - with open(self.state_file) as f: - return f.readlines() - - -class RouterInfo(l3_ha_agent.RouterMixin): - - def __init__(self, router_id, root_helper, router, - use_ipv6=False, ns_name=None): - self.router_id = router_id - self.ex_gw_port = None - self._snat_enabled = None - self._snat_action = None - self.internal_ports = [] - self.snat_ports = [] - self.floating_ips = set() - self.floating_ips_dict = {} - self.root_helper = root_helper - # Invoke the setter for establishing initial SNAT action - self.router = router - self.ns_name = ns_name - self.iptables_manager = iptables_manager.IptablesManager( - root_helper=root_helper, - use_ipv6=use_ipv6, - namespace=self.ns_name) - self.snat_iptables_manager = None - self.routes = [] - # DVR Data - # Linklocal subnet for router and floating IP namespace link - self.rtr_fip_subnet = None - self.dist_fip_count = 0 - - super(RouterInfo, self).__init__() - - @property - def router(self): - return self._router - - @router.setter - def router(self, value): - self._router = value - if not self._router: - return - # enable_snat by default if it wasn't specified by plugin - self._snat_enabled = self._router.get('enable_snat', True) - # Set a SNAT action for the router - if self._router.get('gw_port'): - self._snat_action = ('add_rules' if self._snat_enabled - else 'remove_rules') - elif self.ex_gw_port: - # Gateway port was removed, remove rules - self._snat_action = 'remove_rules' - - def perform_snat_action(self, snat_callback, *args): - # Process SNAT rules for attached subnets - if self._snat_action: - snat_callback(self, self._router.get('gw_port'), - *args, action=self._snat_action) - self._snat_action = None - - -class RouterUpdate(object): - """Encapsulates a router update - - An instance of this object carries the information necessary to prioritize - and process a request to update a router. - """ - def __init__(self, router_id, priority, - action=None, router=None, timestamp=None): - self.priority = priority - self.timestamp = timestamp - if not timestamp: - self.timestamp = timeutils.utcnow() - self.id = router_id - self.action = action - self.router = router - - def __lt__(self, other): - """Implements priority among updates - - Lower numerical priority always gets precedence. When comparing two - updates of the same priority then the one with the earlier timestamp - gets procedence. In the unlikely event that the timestamps are also - equal it falls back to a simple comparison of ids meaning the - precedence is essentially random. - """ - if self.priority != other.priority: - return self.priority < other.priority - if self.timestamp != other.timestamp: - return self.timestamp < other.timestamp - return self.id < other.id - - -class ExclusiveRouterProcessor(object): - """Manager for access to a router for processing - - This class controls access to a router in a non-blocking way. The first - instance to be created for a given router_id is granted exclusive access to - the router. - - Other instances may be created for the same router_id while the first - instance has exclusive access. If that happens then it doesn't block and - wait for access. Instead, it signals to the master instance that an update - came in with the timestamp. - - This way, a thread will not block to wait for access to a router. Instead - it effectively signals to the thread that is working on the router that - something has changed since it started working on it. That thread will - simply finish its current iteration and then repeat. - - This class keeps track of the last time that a router data was fetched and - processed. The timestamp that it keeps must be before when the data used - to process the router last was fetched from the database. But, as close as - possible. The timestamp should not be recorded, however, until the router - has been processed using the fetch data. - """ - _masters = {} - _router_timestamps = {} - - def __init__(self, router_id): - self._router_id = router_id - - if router_id not in self._masters: - self._masters[router_id] = self - self._queue = [] - - self._master = self._masters[router_id] - - def _i_am_master(self): - return self == self._master - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - if self._i_am_master(): - del self._masters[self._router_id] - - def _get_router_data_timestamp(self): - return self._router_timestamps.get(self._router_id, - datetime.datetime.min) - - def fetched_and_processed(self, timestamp): - """Records the data timestamp after it is used to update the router""" - new_timestamp = max(timestamp, self._get_router_data_timestamp()) - self._router_timestamps[self._router_id] = new_timestamp - - def queue_update(self, update): - """Queues an update from a worker - - This is the queue used to keep new updates that come in while a router - is being processed. These updates have already bubbled to the front of - the RouterProcessingQueue. - """ - self._master._queue.append(update) - - def updates(self): - """Processes the router until updates stop coming - - Only the master instance will process the router. However, updates may - come in from other workers while it is in progress. This method loops - until they stop coming. - """ - if self._i_am_master(): - while self._queue: - # Remove the update from the queue even if it is old. - update = self._queue.pop(0) - # Process the update only if it is fresh. - if self._get_router_data_timestamp() < update.timestamp: - yield update - - -class RouterProcessingQueue(object): - """Manager of the queue of routers to process.""" - def __init__(self): - self._queue = Queue.PriorityQueue() - - def add(self, update): - self._queue.put(update) - - def each_update_to_next_router(self): - """Grabs the next router from the queue and processes - - This method uses a for loop to process the router repeatedly until - updates stop bubbling to the front of the queue. - """ - next_update = self._queue.get() - - with ExclusiveRouterProcessor(next_update.id) as rp: - # Queue the update whether this worker is the master or not. - rp.queue_update(next_update) - - # Here, if the current worker is not the master, the call to - # rp.updates() will not yield and so this will essentially be a - # noop. - for update in rp.updates(): - yield (rp, update) - - class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, - l3_ha_agent.AgentMixin, + ha.AgentMixin, manager.Manager): """Manager for L3NatAgent @@ -554,12 +264,12 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, # dvr data self.agent_gateway_port = None self.fip_ns_subscribers = set() - self.local_subnets = LinkLocalAllocator( + self.local_subnets = lla.LinkLocalAllocator( os.path.join(self.conf.state_path, 'fip-linklocal-networks'), FIP_LL_SUBNET) self.fip_priorities = set(range(FIP_PR_START, FIP_PR_END)) - self._queue = RouterProcessingQueue() + self._queue = queue.RouterProcessingQueue() super(L3NATAgent, self).__init__(conf=self.conf) self.target_ex_net_id = None @@ -745,11 +455,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, def _router_added(self, router_id, router): ns_name = (self.get_ns_name(router_id) if self.conf.use_namespaces else None) - ri = RouterInfo(router_id=router_id, - root_helper=self.root_helper, - router=router, - use_ipv6=self.use_ipv6, - ns_name=ns_name) + ri = router_info.RouterInfo(router_id=router_id, + root_helper=self.root_helper, + router=router, + use_ipv6=self.use_ipv6, + ns_name=ns_name) self.router_info[router_id] = ri if self.conf.use_namespaces: self._create_router_namespace(ri) @@ -1698,7 +1408,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, def router_deleted(self, context, router_id): """Deal with router deletion RPC message.""" LOG.debug('Got router deleted notification for %s', router_id) - update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER) + update = queue.RouterUpdate(router_id, + queue.PRIORITY_RPC, + action=queue.DELETE_ROUTER) self._queue.add(update) def _update_arp_entry(self, ri, ip, mac, subnet_id, operation): @@ -1751,13 +1463,15 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, if isinstance(routers[0], dict): routers = [router['id'] for router in routers] for id in routers: - update = RouterUpdate(id, PRIORITY_RPC) + update = queue.RouterUpdate(id, queue.PRIORITY_RPC) self._queue.add(update) def router_removed_from_agent(self, context, payload): LOG.debug('Got router removed from agent :%r', payload) router_id = payload['router_id'] - update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER) + update = queue.RouterUpdate(router_id, + queue.PRIORITY_RPC, + action=queue.DELETE_ROUTER) self._queue.add(update) def router_added_to_agent(self, context, payload): @@ -1801,7 +1515,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, for rp, update in self._queue.each_update_to_next_router(): LOG.debug("Starting router update for %s", update.id) router = update.router - if update.action != DELETE_ROUTER and not router: + if update.action != queue.DELETE_ROUTER and not router: try: update.timestamp = timeutils.utcnow() routers = self.plugin_rpc.get_routers(self.context, @@ -1870,10 +1584,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, else: LOG.debug('Processing :%r', routers) for r in routers: - update = RouterUpdate(r['id'], - PRIORITY_SYNC_ROUTERS_TASK, - router=r, - timestamp=timestamp) + update = queue.RouterUpdate(r['id'], + queue.PRIORITY_SYNC_ROUTERS_TASK, + router=r, + timestamp=timestamp) self._queue.add(update) self.fullsync = False LOG.debug("periodic_sync_routers_task successfully completed") @@ -1884,10 +1598,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, # Two kinds of stale routers: Routers for which info is cached in # self.router_info and the others. First, handle the former. for router_id in prev_router_ids - curr_router_ids: - update = RouterUpdate(router_id, - PRIORITY_SYNC_ROUTERS_TASK, - timestamp=timestamp, - action=DELETE_ROUTER) + update = queue.RouterUpdate(router_id, + queue.PRIORITY_SYNC_ROUTERS_TASK, + timestamp=timestamp, + action=queue.DELETE_ROUTER) self._queue.add(update) # Next, one effort to clean out namespaces for which we don't have @@ -2001,7 +1715,7 @@ class L3NATAgentWithStateReport(L3NATAgent): def _register_opts(conf): conf.register_opts(L3NATAgent.OPTS) - conf.register_opts(l3_ha_agent.OPTS) + conf.register_opts(ha.OPTS) config.register_interface_driver_opts_helper(conf) config.register_use_namespaces_opts_helper(conf) config.register_agent_state_opts_helper(conf) @@ -2010,7 +1724,7 @@ def _register_opts(conf): conf.register_opts(external_process.OPTS) -def main(manager='neutron.agent.l3_agent.L3NATAgentWithStateReport'): +def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'): _register_opts(cfg.CONF) common_config.init(sys.argv[1:]) config.setup_logging() diff --git a/neutron/agent/l3_ha_agent.py b/neutron/agent/l3/ha.py similarity index 100% rename from neutron/agent/l3_ha_agent.py rename to neutron/agent/l3/ha.py diff --git a/neutron/agent/l3/link_local_allocator.py b/neutron/agent/l3/link_local_allocator.py new file mode 100644 index 0000000000..594daa716e --- /dev/null +++ b/neutron/agent/l3/link_local_allocator.py @@ -0,0 +1,109 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr +import os + + +class LinkLocalAddressPair(netaddr.IPNetwork): + def __init__(self, addr): + super(LinkLocalAddressPair, self).__init__(addr) + + def get_pair(self): + """Builds an address pair from the first and last addresses. """ + return (netaddr.IPNetwork("%s/%s" % (self.network, self.prefixlen)), + netaddr.IPNetwork("%s/%s" % (self.broadcast, self.prefixlen))) + + +class LinkLocalAllocator(object): + """Manages allocation of link local IP addresses. + + These link local addresses are used for routing inside the fip namespaces. + The associations need to persist across agent restarts to maintain + consistency. Without this, there is disruption in network connectivity + as the agent rewires the connections with the new IP address assocations. + + Persisting these in the database is unnecessary and would degrade + performance. + """ + def __init__(self, state_file, subnet): + """Read the file with previous allocations recorded. + + See the note in the allocate method for more detail. + """ + self.state_file = state_file + subnet = netaddr.IPNetwork(subnet) + + self.allocations = {} + + self.remembered = {} + for line in self._read(): + key, cidr = line.strip().split(',') + self.remembered[key] = LinkLocalAddressPair(cidr) + + self.pool = set(LinkLocalAddressPair(s) for s in subnet.subnet(31)) + self.pool.difference_update(self.remembered.values()) + + def allocate(self, key): + """Try to allocate a link local address pair. + + I expect this to work in all cases because I expect the pool size to be + large enough for any situation. Nonetheless, there is some defensive + programming in here. + + Since the allocations are persisted, there is the chance to leak + allocations which should have been released but were not. This leak + could eventually exhaust the pool. + + So, if a new allocation is needed, the code first checks to see if + there are any remembered allocations for the key. If not, it checks + the free pool. If the free pool is empty then it dumps the remembered + allocations to free the pool. This final desparate step will not + happen often in practice. + """ + if key in self.remembered: + self.allocations[key] = self.remembered.pop(key) + return self.allocations[key] + + if not self.pool: + # Desparate times. Try to get more in the pool. + self.pool.update(self.remembered.values()) + self.remembered.clear() + if not self.pool: + # More than 256 routers on a compute node! + raise RuntimeError(_("Cannot allocate link local address")) + + self.allocations[key] = self.pool.pop() + self._write_allocations() + return self.allocations[key] + + def release(self, key): + self.pool.add(self.allocations.pop(key)) + self._write_allocations() + + def _write_allocations(self): + current = ["%s,%s\n" % (k, v) for k, v in self.allocations.items()] + remembered = ["%s,%s\n" % (k, v) for k, v in self.remembered.items()] + current.extend(remembered) + self._write(current) + + def _write(self, lines): + with open(self.state_file, "w") as f: + f.writelines(lines) + + def _read(self): + if not os.path.exists(self.state_file): + return [] + with open(self.state_file) as f: + return f.readlines() diff --git a/neutron/agent/l3/router_info.py b/neutron/agent/l3/router_info.py new file mode 100644 index 0000000000..2999ed8534 --- /dev/null +++ b/neutron/agent/l3/router_info.py @@ -0,0 +1,72 @@ +# Copyright (c) 2014 Openstack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.l3 import ha +from neutron.agent.linux import iptables_manager + + +class RouterInfo(ha.RouterMixin): + + def __init__(self, router_id, root_helper, router, + use_ipv6=False, ns_name=None): + self.router_id = router_id + self.ex_gw_port = None + self._snat_enabled = None + self._snat_action = None + self.internal_ports = [] + self.snat_ports = [] + self.floating_ips = set() + self.floating_ips_dict = {} + self.root_helper = root_helper + # Invoke the setter for establishing initial SNAT action + self.router = router + self.ns_name = ns_name + self.iptables_manager = iptables_manager.IptablesManager( + root_helper=root_helper, + use_ipv6=use_ipv6, + namespace=self.ns_name) + self.snat_iptables_manager = None + self.routes = [] + # DVR Data + # Linklocal subnet for router and floating IP namespace link + self.rtr_fip_subnet = None + self.dist_fip_count = 0 + + super(RouterInfo, self).__init__() + + @property + def router(self): + return self._router + + @router.setter + def router(self, value): + self._router = value + if not self._router: + return + # enable_snat by default if it wasn't specified by plugin + self._snat_enabled = self._router.get('enable_snat', True) + # Set a SNAT action for the router + if self._router.get('gw_port'): + self._snat_action = ('add_rules' if self._snat_enabled + else 'remove_rules') + elif self.ex_gw_port: + # Gateway port was removed, remove rules + self._snat_action = 'remove_rules' + + def perform_snat_action(self, snat_callback, *args): + # Process SNAT rules for attached subnets + if self._snat_action: + snat_callback(self, self._router.get('gw_port'), + *args, action=self._snat_action) + self._snat_action = None diff --git a/neutron/agent/l3/router_processing_queue.py b/neutron/agent/l3/router_processing_queue.py new file mode 100644 index 0000000000..a206437ea6 --- /dev/null +++ b/neutron/agent/l3/router_processing_queue.py @@ -0,0 +1,162 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import datetime +import Queue + +from oslo.utils import timeutils + +# Lower value is higher priority +PRIORITY_RPC = 0 +PRIORITY_SYNC_ROUTERS_TASK = 1 +DELETE_ROUTER = 1 + + +class RouterUpdate(object): + """Encapsulates a router update + + An instance of this object carries the information necessary to prioritize + and process a request to update a router. + """ + def __init__(self, router_id, priority, + action=None, router=None, timestamp=None): + self.priority = priority + self.timestamp = timestamp + if not timestamp: + self.timestamp = timeutils.utcnow() + self.id = router_id + self.action = action + self.router = router + + def __lt__(self, other): + """Implements priority among updates + + Lower numerical priority always gets precedence. When comparing two + updates of the same priority then the one with the earlier timestamp + gets procedence. In the unlikely event that the timestamps are also + equal it falls back to a simple comparison of ids meaning the + precedence is essentially random. + """ + if self.priority != other.priority: + return self.priority < other.priority + if self.timestamp != other.timestamp: + return self.timestamp < other.timestamp + return self.id < other.id + + +class ExclusiveRouterProcessor(object): + """Manager for access to a router for processing + + This class controls access to a router in a non-blocking way. The first + instance to be created for a given router_id is granted exclusive access to + the router. + + Other instances may be created for the same router_id while the first + instance has exclusive access. If that happens then it doesn't block and + wait for access. Instead, it signals to the master instance that an update + came in with the timestamp. + + This way, a thread will not block to wait for access to a router. Instead + it effectively signals to the thread that is working on the router that + something has changed since it started working on it. That thread will + simply finish its current iteration and then repeat. + + This class keeps track of the last time that a router data was fetched and + processed. The timestamp that it keeps must be before when the data used + to process the router last was fetched from the database. But, as close as + possible. The timestamp should not be recorded, however, until the router + has been processed using the fetch data. + """ + _masters = {} + _router_timestamps = {} + + def __init__(self, router_id): + self._router_id = router_id + + if router_id not in self._masters: + self._masters[router_id] = self + self._queue = [] + + self._master = self._masters[router_id] + + def _i_am_master(self): + return self == self._master + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + if self._i_am_master(): + del self._masters[self._router_id] + + def _get_router_data_timestamp(self): + return self._router_timestamps.get(self._router_id, + datetime.datetime.min) + + def fetched_and_processed(self, timestamp): + """Records the data timestamp after it is used to update the router""" + new_timestamp = max(timestamp, self._get_router_data_timestamp()) + self._router_timestamps[self._router_id] = new_timestamp + + def queue_update(self, update): + """Queues an update from a worker + + This is the queue used to keep new updates that come in while a router + is being processed. These updates have already bubbled to the front of + the RouterProcessingQueue. + """ + self._master._queue.append(update) + + def updates(self): + """Processes the router until updates stop coming + + Only the master instance will process the router. However, updates may + come in from other workers while it is in progress. This method loops + until they stop coming. + """ + if self._i_am_master(): + while self._queue: + # Remove the update from the queue even if it is old. + update = self._queue.pop(0) + # Process the update only if it is fresh. + if self._get_router_data_timestamp() < update.timestamp: + yield update + + +class RouterProcessingQueue(object): + """Manager of the queue of routers to process.""" + def __init__(self): + self._queue = Queue.PriorityQueue() + + def add(self, update): + self._queue.put(update) + + def each_update_to_next_router(self): + """Grabs the next router from the queue and processes + + This method uses a for loop to process the router repeatedly until + updates stop bubbling to the front of the queue. + """ + next_update = self._queue.get() + + with ExclusiveRouterProcessor(next_update.id) as rp: + # Queue the update whether this worker is the master or not. + rp.queue_update(next_update) + + # Here, if the current worker is not the master, the call to + # rp.updates() will not yield and so this will essentially be a + # noop. + for update in rp.updates(): + yield (rp, update) diff --git a/neutron/agent/netns_cleanup_util.py b/neutron/agent/netns_cleanup_util.py index 60e3f82fd5..5f3d6dca6c 100644 --- a/neutron/agent/netns_cleanup_util.py +++ b/neutron/agent/netns_cleanup_util.py @@ -23,7 +23,7 @@ from oslo.utils import importutils from neutron.agent.common import config as agent_config from neutron.agent import dhcp_agent -from neutron.agent import l3_agent +from neutron.agent.l3 import agent as l3_agent from neutron.agent.linux import dhcp from neutron.agent.linux import interface from neutron.agent.linux import ip_lib diff --git a/neutron/agent/ovs_cleanup_util.py b/neutron/agent/ovs_cleanup_util.py index 9d365abbaa..2f9ca68ca4 100644 --- a/neutron/agent/ovs_cleanup_util.py +++ b/neutron/agent/ovs_cleanup_util.py @@ -16,7 +16,7 @@ from oslo.config import cfg from neutron.agent.common import config as agent_config -from neutron.agent import l3_agent +from neutron.agent.l3 import agent from neutron.agent.linux import interface from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib @@ -45,7 +45,7 @@ def setup_conf(): conf = cfg.CONF conf.register_cli_opts(opts) - conf.register_opts(l3_agent.L3NATAgent.OPTS) + conf.register_opts(agent.L3NATAgent.OPTS) conf.register_opts(interface.OPTS) agent_config.register_interface_driver_opts_helper(conf) agent_config.register_use_namespaces_opts_helper(conf) diff --git a/neutron/tests/common/agents/l3_agent.py b/neutron/tests/common/agents/l3_agent.py index 92ad587fa4..529ecefe06 100644 --- a/neutron/tests/common/agents/l3_agent.py +++ b/neutron/tests/common/agents/l3_agent.py @@ -13,10 +13,10 @@ # under the License. -from neutron.agent import l3_agent +from neutron.agent.l3 import agent -class TestL3NATAgent(l3_agent.L3NATAgentWithStateReport): +class TestL3NATAgent(agent.L3NATAgentWithStateReport): NESTED_NAMESPACE_SEPARATOR = '@' def get_ns_name(self, router_id): diff --git a/neutron/tests/functional/agent/test_l3_agent.py b/neutron/tests/functional/agent/test_l3_agent.py index 99f1d1edd4..e7b86d5b39 100644 --- a/neutron/tests/functional/agent/test_l3_agent.py +++ b/neutron/tests/functional/agent/test_l3_agent.py @@ -20,7 +20,7 @@ import mock from oslo.config import cfg from neutron.agent.common import config as agent_config -from neutron.agent import l3_agent +from neutron.agent.l3 import agent as l3_agent from neutron.agent.linux import external_process from neutron.agent.linux import ip_lib from neutron.common import config as common_config @@ -39,7 +39,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase): def setUp(self): super(L3AgentTestFramework, self).setUp() self.check_sudo_enabled() - mock.patch('neutron.agent.l3_agent.L3PluginApi').start() + mock.patch('neutron.agent.l3.agent.L3PluginApi').start() self.agent = self._configure_agent('agent1') def _get_config_opts(self): diff --git a/neutron/tests/unit/test_l3_agent.py b/neutron/tests/unit/test_l3_agent.py index 54ae156cf6..e50eda7cc0 100644 --- a/neutron/tests/unit/test_l3_agent.py +++ b/neutron/tests/unit/test_l3_agent.py @@ -15,7 +15,6 @@ import contextlib import copy -import datetime import mock import netaddr @@ -24,8 +23,10 @@ from oslo import messaging from testtools import matchers from neutron.agent.common import config as agent_config -from neutron.agent import l3_agent -from neutron.agent import l3_ha_agent +from neutron.agent.l3 import agent as l3_agent +from neutron.agent.l3 import ha +from neutron.agent.l3 import link_local_allocator as lla +from neutron.agent.l3 import router_info as l3router from neutron.agent.linux import interface from neutron.common import config as base_config from neutron.common import constants as l3_constants @@ -49,161 +50,6 @@ class FakeDev(object): self.name = name -class TestExclusiveRouterProcessor(base.BaseTestCase): - def setUp(self): - super(TestExclusiveRouterProcessor, self).setUp() - - def test_i_am_master(self): - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) - not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) - - self.assertTrue(master._i_am_master()) - self.assertFalse(not_master._i_am_master()) - self.assertTrue(master_2._i_am_master()) - self.assertFalse(not_master_2._i_am_master()) - - master.__exit__(None, None, None) - master_2.__exit__(None, None, None) - - def test_master(self): - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) - not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) - - self.assertEqual(master._master, master) - self.assertEqual(not_master._master, master) - self.assertEqual(master_2._master, master_2) - self.assertEqual(not_master_2._master, master_2) - - master.__exit__(None, None, None) - master_2.__exit__(None, None, None) - - def test__enter__(self): - self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - master.__enter__() - self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) - master.__exit__(None, None, None) - - def test__exit__(self): - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - master.__enter__() - self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) - not_master.__enter__() - not_master.__exit__(None, None, None) - self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) - master.__exit__(None, None, None) - self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) - - def test_data_fetched_since(self): - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - self.assertEqual(master._get_router_data_timestamp(), - datetime.datetime.min) - - ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) - ts2 = datetime.datetime.utcnow() - - master.fetched_and_processed(ts2) - self.assertEqual(master._get_router_data_timestamp(), ts2) - master.fetched_and_processed(ts1) - self.assertEqual(master._get_router_data_timestamp(), ts2) - - master.__exit__(None, None, None) - - def test_updates(self): - master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) - - master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0)) - not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0)) - - for update in not_master.updates(): - raise Exception("Only the master should process a router") - - self.assertEqual(2, len([i for i in master.updates()])) - - -class TestLinkLocalAddrAllocator(base.BaseTestCase): - def setUp(self): - super(TestLinkLocalAddrAllocator, self).setUp() - self.subnet = netaddr.IPNetwork('169.254.31.0/24') - - def test__init__(self): - a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr) - self.assertEqual('/file', a.state_file) - self.assertEqual({}, a.allocations) - - def test__init__readfile(self): - with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read: - read.return_value = ["da873ca2,169.254.31.28/31\n"] - a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr) - - self.assertTrue('da873ca2' in a.remembered) - self.assertEqual({}, a.allocations) - - def test_allocate(self): - a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr) - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write: - subnet = a.allocate('deadbeef') - - self.assertTrue('deadbeef' in a.allocations) - self.assertTrue(subnet not in a.pool) - self._check_allocations(a.allocations) - write.assert_called_once_with(['deadbeef,%s\n' % subnet.cidr]) - - def test_allocate_from_file(self): - with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read: - read.return_value = ["deadbeef,169.254.31.88/31\n"] - a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr) - - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write: - subnet = a.allocate('deadbeef') - - self.assertEqual(netaddr.IPNetwork('169.254.31.88/31'), subnet) - self.assertTrue(subnet not in a.pool) - self._check_allocations(a.allocations) - self.assertFalse(write.called) - - def test_allocate_exhausted_pool(self): - subnet = netaddr.IPNetwork('169.254.31.0/31') - with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read: - read.return_value = ["deadbeef,169.254.31.0/31\n"] - a = l3_agent.LinkLocalAllocator('/file', subnet.cidr) - - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write: - allocation = a.allocate('abcdef12') - - self.assertEqual(subnet, allocation) - self.assertFalse('deadbeef' in a.allocations) - self.assertTrue('abcdef12' in a.allocations) - self.assertTrue(allocation not in a.pool) - self._check_allocations(a.allocations) - write.assert_called_once_with(['abcdef12,%s\n' % allocation.cidr]) - - self.assertRaises(RuntimeError, a.allocate, 'deadbeef') - - def test_release(self): - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write: - a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr) - subnet = a.allocate('deadbeef') - write.reset_mock() - a.release('deadbeef') - - self.assertTrue('deadbeef' not in a.allocations) - self.assertTrue(subnet in a.pool) - self.assertEqual({}, a.allocations) - write.assert_called_once_with([]) - - def _check_allocations(self, allocations): - for key, subnet in allocations.items(): - self.assertTrue(subnet in self.subnet) - self.assertEqual(subnet.prefixlen, 31) - - def router_append_interface(router, count=1, ip_version=4, ra_mode=None, addr_mode=None): if ip_version == 4: @@ -318,7 +164,7 @@ class TestBasicRouterOperations(base.BaseTestCase): self.conf = agent_config.setup_conf() self.conf.register_opts(base_config.core_opts) self.conf.register_opts(l3_agent.L3NATAgent.OPTS) - self.conf.register_opts(l3_ha_agent.OPTS) + self.conf.register_opts(ha.OPTS) agent_config.register_interface_driver_opts_helper(self.conf) agent_config.register_use_namespaces_opts_helper(self.conf) agent_config.register_root_helper(self.conf) @@ -334,7 +180,7 @@ class TestBasicRouterOperations(base.BaseTestCase): 'neutron.agent.linux.ip_lib.device_exists') self.device_exists = self.device_exists_p.start() - mock.patch('neutron.agent.l3_ha_agent.AgentMixin' + mock.patch('neutron.agent.l3.ha.AgentMixin' '._init_ha_conf_path').start() mock.patch('neutron.agent.linux.keepalived.KeepalivedNotifierMixin' '._get_full_config_file_path').start() @@ -352,7 +198,7 @@ class TestBasicRouterOperations(base.BaseTestCase): self.external_process = self.external_process_p.start() self.send_arp_p = mock.patch( - 'neutron.agent.l3_agent.L3NATAgent._send_gratuitous_arp_packet') + 'neutron.agent.l3.agent.L3NATAgent._send_gratuitous_arp_packet') self.send_arp = self.send_arp_p.start() self.dvr_cls_p = mock.patch('neutron.agent.linux.interface.NullDriver') @@ -376,7 +222,7 @@ class TestBasicRouterOperations(base.BaseTestCase): ip_dev.return_value = self.mock_ip_dev self.l3pluginApi_cls_p = mock.patch( - 'neutron.agent.l3_agent.L3PluginApi') + 'neutron.agent.l3.agent.L3PluginApi') l3pluginApi_cls = self.l3pluginApi_cls_p.start() self.plugin_api = mock.MagicMock() l3pluginApi_cls.return_value = self.plugin_api @@ -412,7 +258,7 @@ class TestBasicRouterOperations(base.BaseTestCase): network_id = _uuid() router = prepare_router_data(num_internal_ports=2) router_id = router['id'] - ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, + ri = l3router.RouterInfo(router_id, self.conf.root_helper, router=router) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) cidr = '99.0.1.9/24' @@ -440,7 +286,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def test_router_info_create(self): id = _uuid() ns = "ns-" + id - ri = l3_agent.RouterInfo(id, self.conf.root_helper, {}, ns_name=ns) + ri = l3router.RouterInfo(id, self.conf.root_helper, {}, ns_name=ns) self.assertTrue(ri.ns_name.endswith(id)) @@ -458,7 +304,7 @@ class TestBasicRouterOperations(base.BaseTestCase): 'routes': [], 'gw_port': ex_gw_port} ns = "ns-" + id - ri = l3_agent.RouterInfo(id, self.conf.root_helper, router, ns_name=ns) + ri = l3router.RouterInfo(id, self.conf.root_helper, router, ns_name=ns) self.assertTrue(ri.ns_name.endswith(id)) self.assertEqual(ri.router, router) @@ -529,7 +375,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def _test_external_gateway_action(self, action, router): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router, ns_name=agent.get_ns_name(router['id'])) # Special setup for dvr routers @@ -600,7 +446,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def test_external_gateway_updated(self): router = prepare_router_data(num_internal_ports=2) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router, ns_name=agent.get_ns_name(router['id'])) interface_name, ex_gw_port = self._prepare_ext_gw_test(agent) @@ -627,7 +473,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def _test_ext_gw_updated_dvr_agent_mode(self, host, agent_mode, expected_call_count): router = prepare_router_data(num_internal_ports=2) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) interface_name, ex_gw_port = self._prepare_ext_gw_test(agent) @@ -672,7 +518,7 @@ class TestBasicRouterOperations(base.BaseTestCase): self.conf.set_override('use_namespaces', False) router_id = _uuid() - ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {}) + ri = l3router.RouterInfo(router_id, self.conf.root_helper, {}) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) floating_ip = '20.0.0.101' interface_name = agent.get_external_device_name(router_id) @@ -711,7 +557,7 @@ class TestBasicRouterOperations(base.BaseTestCase): self.conf.set_override('use_namespaces', False) router_id = _uuid() - ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {}) + ri = l3router.RouterInfo(router_id, self.conf.root_helper, {}) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) fake_route1 = {'destination': '135.207.0.0/16', @@ -757,7 +603,7 @@ class TestBasicRouterOperations(base.BaseTestCase): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router_id = _uuid() - ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {}) + ri = l3router.RouterInfo(router_id, self.conf.root_helper, {}) ri.router = {} fake_old_routes = [] @@ -815,7 +661,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def test__map_internal_interfaces(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data(num_internal_ports=4) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) test_port = { 'mac_address': '00:12:23:34:45:56', @@ -841,7 +687,7 @@ class TestBasicRouterOperations(base.BaseTestCase): router = prepare_router_data(num_internal_ports=4) subnet_ids = [_get_subnet_id(port) for port in router[l3_constants.INTERFACE_KEY]] - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) # Test Basic cases @@ -870,7 +716,7 @@ class TestBasicRouterOperations(base.BaseTestCase): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data(num_internal_ports=2) router['distributed'] = True - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) ports = ri.router.get(l3_constants.INTERFACE_KEY, []) test_ports = [{'mac_address': '00:11:22:33:44:55', @@ -921,7 +767,7 @@ class TestBasicRouterOperations(base.BaseTestCase): def test__update_arp_entry_with_no_subnet(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - ri = l3_agent.RouterInfo( + ri = l3router.RouterInfo( 'foo_router_id', mock.ANY, {'distributed': True, 'gw_port_host': HOSTNAME}) with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as f: @@ -949,13 +795,13 @@ class TestBasicRouterOperations(base.BaseTestCase): def test_process_cent_router(self): router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) self._test_process_router(ri) def test_process_dist_router(self): router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) subnet_id = _get_subnet_id(router[l3_constants.INTERFACE_KEY][0]) ri.router['distributed'] = True @@ -1047,7 +893,7 @@ class TestBasicRouterOperations(base.BaseTestCase): router['routes'] = [ {'destination': '8.8.8.8/32', 'nexthop': '35.4.0.10'}, {'destination': '8.8.4.4/32', 'nexthop': '35.4.0.11'}] - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) ri.router = router with contextlib.nested(mock.patch.object(agent, @@ -1104,7 +950,7 @@ vrrp_instance VR_1 { device.addr.list.return_value = [] ri.iptables_manager.ipv4['nat'] = mock.MagicMock() - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'): + with mock.patch.object(lla.LinkLocalAllocator, '_write'): fip_statuses = agent.process_router_floating_ip_addresses( ri, {'id': _uuid()}) self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE}, @@ -1141,7 +987,7 @@ vrrp_instance VR_1 { router = prepare_router_data(enable_snat=True) router[l3_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips'] - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) ri.iptables_manager.ipv4['nat'] = mock.MagicMock() agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) @@ -1165,7 +1011,7 @@ vrrp_instance VR_1 { router = prepare_router_data(enable_snat=True) router[l3_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips'] router['distributed'] = True - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) ri.iptables_manager.ipv4['nat'] = mock.MagicMock() agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) @@ -1288,7 +1134,7 @@ vrrp_instance VR_1 { def test_process_router_snat_disabled(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data(enable_snat=True) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process with NAT @@ -1310,7 +1156,7 @@ vrrp_instance VR_1 { def test_process_router_snat_enabled(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data(enable_snat=False) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process without NAT @@ -1332,7 +1178,7 @@ vrrp_instance VR_1 { def test_process_router_interface_added(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process with NAT @@ -1351,7 +1197,7 @@ vrrp_instance VR_1 { # Get NAT rules without the gw_port gw_port = router['gw_port'] router['gw_port'] = None - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() agent.process_router(ri) @@ -1359,7 +1205,7 @@ vrrp_instance VR_1 { # Get NAT rules with the gw_port router['gw_port'] = gw_port - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) with mock.patch.object( agent, @@ -1374,7 +1220,7 @@ vrrp_instance VR_1 { def _process_router_ipv6_interface_added( self, router, ra_mode=None, addr_mode=None): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process with NAT @@ -1432,7 +1278,7 @@ vrrp_instance VR_1 { def test_process_router_ipv6v4_interface_added(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process with NAT @@ -1448,7 +1294,7 @@ vrrp_instance VR_1 { def test_process_router_interface_removed(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data(num_internal_ports=2) - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # Process with NAT @@ -1464,7 +1310,7 @@ vrrp_instance VR_1 { def test_process_router_ipv6_interface_removed(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() ri.router = router @@ -1483,7 +1329,7 @@ vrrp_instance VR_1 { def test_process_router_internal_network_added_unexpected_error(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() with mock.patch.object( @@ -1509,7 +1355,7 @@ vrrp_instance VR_1 { def test_process_router_internal_network_removed_unexpected_error(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() # add an internal port @@ -1550,7 +1396,7 @@ vrrp_instance VR_1 { 'fixed_ip_address': '7.7.7.7', 'port_id': router[l3_constants.INTERFACE_KEY][0]['id']}] - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() agent.process_router(ri) @@ -1583,7 +1429,7 @@ vrrp_instance VR_1 { 'fixed_ip_address': '7.7.7.7', 'port_id': router[l3_constants.INTERFACE_KEY][0]['id']}] - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent.external_gateway_added = mock.Mock() agent.process_router(ri) @@ -1594,7 +1440,7 @@ vrrp_instance VR_1 { {fip_id: l3_constants.FLOATINGIP_STATUS_ERROR}) def test_handle_router_snat_rules_distributed_without_snat_manager(self): - ri = l3_agent.RouterInfo( + ri = l3router.RouterInfo( 'foo_router_id', mock.ANY, {'distributed': True}) ri.iptables_manager = mock.Mock() @@ -1626,7 +1472,7 @@ vrrp_instance VR_1 { def test_handle_router_snat_rules_add_rules(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - ri = l3_agent.RouterInfo(_uuid(), self.conf.root_helper, {}) + ri = l3router.RouterInfo(_uuid(), self.conf.root_helper, {}) ex_gw_port = {'fixed_ips': [{'ip_address': '192.168.1.4'}]} ri.router = {'distributed': False} agent._handle_router_snat_rules(ri, ex_gw_port, @@ -1657,7 +1503,7 @@ vrrp_instance VR_1 { self.mock_ip.get_devices.return_value = get_devices_return router = prepare_router_data(enable_snat=True, num_internal_ports=1) - ri = l3_agent.RouterInfo(router['id'], + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) @@ -1700,7 +1546,7 @@ vrrp_instance VR_1 { router = prepare_router_data(enable_snat=True, num_internal_ports=1) del router['gw_port'] - ri = l3_agent.RouterInfo(router['id'], + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) @@ -2030,7 +1876,7 @@ vrrp_instance VR_1 { def test_create_dvr_gateway(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) port_id = _uuid() @@ -2091,14 +1937,14 @@ vrrp_instance VR_1 { 'floating_network_id': _uuid(), 'port_id': _uuid()} - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) rtr_2_fip_name = agent.get_rtr_int_device_name(ri.router_id) fip_2_rtr_name = agent.get_fip_int_device_name(ri.router_id) fip_ns_name = agent.get_fip_ns_name(str(fip['floating_network_id'])) - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'): + with mock.patch.object(lla.LinkLocalAllocator, '_write'): self.device_exists.return_value = False agent.create_rtr_2_fip_link(ri, fip['floating_network_id']) self.mock_ip.add_veth.assert_called_with(rtr_2_fip_name, @@ -2113,17 +1959,17 @@ vrrp_instance VR_1 { agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) self.device_exists.return_value = True - with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'): + with mock.patch.object(lla.LinkLocalAllocator, '_write'): agent.create_rtr_2_fip_link(ri, {}) self.assertFalse(self.mock_ip.add_veth.called) def test_floating_ip_added_dist(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) agent_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', 'subnet_id': _uuid()}], @@ -2140,14 +1986,14 @@ vrrp_instance VR_1 { 'floating_network_id': _uuid(), 'port_id': _uuid()} agent.agent_gateway_port = agent_gw_port - ri.rtr_fip_subnet = l3_agent.LinkLocalAddressPair('169.254.30.42/31') + ri.rtr_fip_subnet = lla.LinkLocalAddressPair('169.254.30.42/31') agent.floating_ip_added_dist(ri, fip) self.mock_rule.add_rule_from.assert_called_with('192.168.0.1', 16, FIP_PRI) # TODO(mrsmith): add more asserts @mock.patch.object(l3_agent.L3NATAgent, '_fip_ns_unsubscribe') - @mock.patch.object(l3_agent.LinkLocalAllocator, '_write') + @mock.patch.object(lla.LinkLocalAllocator, '_write') def test_floating_ip_removed_dist(self, write, unsubscribe): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = prepare_router_data() @@ -2160,7 +2006,7 @@ vrrp_instance VR_1 { 'ip_cidr': '20.0.0.30/24'} fip_cidr = '11.22.33.44/24' - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router=router) ri.dist_fip_count = 2 agent.fip_ns_subscribers.add(ri.router_id) @@ -2168,7 +2014,7 @@ vrrp_instance VR_1 { ri.fip_2_rtr = '11.22.33.42' ri.rtr_2_fip = '11.22.33.40' agent.agent_gateway_port = agent_gw_port - s = l3_agent.LinkLocalAddressPair('169.254.30.42/31') + s = lla.LinkLocalAddressPair('169.254.30.42/31') ri.rtr_fip_subnet = s agent.floating_ip_removed_dist(ri, fip_cidr) self.mock_rule.delete_rule_priority.assert_called_with(FIP_PRI) @@ -2278,7 +2124,7 @@ vrrp_instance VR_1 { router['distributed'] = True router['gw_port_host'] = HOSTNAME - ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, router) + ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router) vm_floating_ip = '19.4.4.2' ri.floating_ips_dict[vm_floating_ip] = FIP_PRI ri.dist_fip_count = 1 @@ -2321,7 +2167,7 @@ class TestL3AgentEventHandler(base.BaseTestCase): def setUp(self): super(TestL3AgentEventHandler, self).setUp() cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS) - cfg.CONF.register_opts(l3_ha_agent.OPTS) + cfg.CONF.register_opts(ha.OPTS) agent_config.register_interface_driver_opts_helper(cfg.CONF) agent_config.register_use_namespaces_opts_helper(cfg.CONF) cfg.CONF.set_override( @@ -2347,7 +2193,7 @@ class TestL3AgentEventHandler(base.BaseTestCase): driver_cls.return_value = mock_driver l3_plugin_p = mock.patch( - 'neutron.agent.l3_agent.L3PluginApi') + 'neutron.agent.l3.agent.L3PluginApi') l3_plugin_cls = l3_plugin_p.start() l3_plugin_cls.return_value = mock.MagicMock() @@ -2370,7 +2216,7 @@ class TestL3AgentEventHandler(base.BaseTestCase): cfg.CONF.set_override('debug', True) self.external_process_p.stop() - ri = l3_agent.RouterInfo(router_id, None, None) + ri = l3router.RouterInfo(router_id, None, None) try: with mock.patch(ip_class_path) as ip_mock: self.agent._spawn_metadata_proxy(ri.router_id, ri.ns_name) diff --git a/neutron/tests/unit/test_l3_dvr.py b/neutron/tests/unit/test_l3_dvr.py new file mode 100644 index 0000000000..89ad856f1b --- /dev/null +++ b/neutron/tests/unit/test_l3_dvr.py @@ -0,0 +1,96 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import netaddr + +from neutron.agent.l3 import link_local_allocator as lla +from neutron.tests import base + + +class TestLinkLocalAddrAllocator(base.BaseTestCase): + def setUp(self): + super(TestLinkLocalAddrAllocator, self).setUp() + self.subnet = netaddr.IPNetwork('169.254.31.0/24') + + def test__init__(self): + a = lla.LinkLocalAllocator('/file', self.subnet.cidr) + self.assertEqual('/file', a.state_file) + self.assertEqual({}, a.allocations) + + def test__init__readfile(self): + with mock.patch.object(lla.LinkLocalAllocator, '_read') as read: + read.return_value = ["da873ca2,169.254.31.28/31\n"] + a = lla.LinkLocalAllocator('/file', self.subnet.cidr) + + self.assertTrue('da873ca2' in a.remembered) + self.assertEqual({}, a.allocations) + + def test_allocate(self): + a = lla.LinkLocalAllocator('/file', self.subnet.cidr) + with mock.patch.object(lla.LinkLocalAllocator, '_write') as write: + subnet = a.allocate('deadbeef') + + self.assertTrue('deadbeef' in a.allocations) + self.assertTrue(subnet not in a.pool) + self._check_allocations(a.allocations) + write.assert_called_once_with(['deadbeef,%s\n' % subnet.cidr]) + + def test_allocate_from_file(self): + with mock.patch.object(lla.LinkLocalAllocator, '_read') as read: + read.return_value = ["deadbeef,169.254.31.88/31\n"] + a = lla.LinkLocalAllocator('/file', self.subnet.cidr) + + with mock.patch.object(lla.LinkLocalAllocator, '_write') as write: + subnet = a.allocate('deadbeef') + + self.assertEqual(netaddr.IPNetwork('169.254.31.88/31'), subnet) + self.assertTrue(subnet not in a.pool) + self._check_allocations(a.allocations) + self.assertFalse(write.called) + + def test_allocate_exhausted_pool(self): + subnet = netaddr.IPNetwork('169.254.31.0/31') + with mock.patch.object(lla.LinkLocalAllocator, '_read') as read: + read.return_value = ["deadbeef,169.254.31.0/31\n"] + a = lla.LinkLocalAllocator('/file', subnet.cidr) + + with mock.patch.object(lla.LinkLocalAllocator, '_write') as write: + allocation = a.allocate('abcdef12') + + self.assertEqual(subnet, allocation) + self.assertFalse('deadbeef' in a.allocations) + self.assertTrue('abcdef12' in a.allocations) + self.assertTrue(allocation not in a.pool) + self._check_allocations(a.allocations) + write.assert_called_once_with(['abcdef12,%s\n' % allocation.cidr]) + + self.assertRaises(RuntimeError, a.allocate, 'deadbeef') + + def test_release(self): + with mock.patch.object(lla.LinkLocalAllocator, '_write') as write: + a = lla.LinkLocalAllocator('/file', self.subnet.cidr) + subnet = a.allocate('deadbeef') + write.reset_mock() + a.release('deadbeef') + + self.assertTrue('deadbeef' not in a.allocations) + self.assertTrue(subnet in a.pool) + self.assertEqual({}, a.allocations) + write.assert_called_once_with([]) + + def _check_allocations(self, allocations): + for key, subnet in allocations.items(): + self.assertTrue(subnet in self.subnet) + self.assertEqual(subnet.prefixlen, 31) diff --git a/neutron/tests/unit/test_router_processing_queue.py b/neutron/tests/unit/test_router_processing_queue.py new file mode 100644 index 0000000000..0f7bfcdc32 --- /dev/null +++ b/neutron/tests/unit/test_router_processing_queue.py @@ -0,0 +1,102 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import datetime + +from neutron.agent.l3 import router_processing_queue as l3_queue +from neutron.openstack.common import uuidutils +from neutron.tests import base + +_uuid = uuidutils.generate_uuid +FAKE_ID = _uuid() +FAKE_ID_2 = _uuid() + + +class TestExclusiveRouterProcessor(base.BaseTestCase): + def setUp(self): + super(TestExclusiveRouterProcessor, self).setUp() + + def test_i_am_master(self): + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + + self.assertTrue(master._i_am_master()) + self.assertFalse(not_master._i_am_master()) + self.assertTrue(master_2._i_am_master()) + self.assertFalse(not_master_2._i_am_master()) + + master.__exit__(None, None, None) + master_2.__exit__(None, None, None) + + def test_master(self): + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + + self.assertEqual(master._master, master) + self.assertEqual(not_master._master, master) + self.assertEqual(master_2._master, master_2) + self.assertEqual(not_master_2._master, master_2) + + master.__exit__(None, None, None) + master_2.__exit__(None, None, None) + + def test__enter__(self): + self.assertFalse(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters) + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master.__enter__() + self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters) + master.__exit__(None, None, None) + + def test__exit__(self): + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master.__enter__() + self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters) + not_master.__enter__() + not_master.__exit__(None, None, None) + self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters) + master.__exit__(None, None, None) + self.assertFalse(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters) + + def test_data_fetched_since(self): + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + self.assertEqual(master._get_router_data_timestamp(), + datetime.datetime.min) + + ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) + ts2 = datetime.datetime.utcnow() + + master.fetched_and_processed(ts2) + self.assertEqual(master._get_router_data_timestamp(), ts2) + master.fetched_and_processed(ts1) + self.assertEqual(master._get_router_data_timestamp(), ts2) + + master.__exit__(None, None, None) + + def test_updates(self): + master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + + master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0)) + not_master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0)) + + for update in not_master.updates(): + raise Exception("Only the master should process a router") + + self.assertEqual(2, len([i for i in master.updates()])) diff --git a/setup.cfg b/setup.cfg index 83cfc9aeeb..2feb253afb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -103,7 +103,7 @@ console_scripts = neutron-dhcp-agent = neutron.agent.dhcp_agent:main neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main neutron-ibm-agent = neutron.plugins.ibm.agent.sdnve_neutron_agent:main - neutron-l3-agent = neutron.agent.l3_agent:main + neutron-l3-agent = neutron.agent.l3.agent:main neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main neutron-metadata-agent = neutron.agent.metadata.agent:main