diff --git a/neutron/agent/l3/router_processing_queue.py b/neutron/agent/common/resource_processing_queue.py similarity index 58% rename from neutron/agent/l3/router_processing_queue.py rename to neutron/agent/common/resource_processing_queue.py index 42445af5349..e153f91eefc 100644 --- a/neutron/agent/l3/router_processing_queue.py +++ b/neutron/agent/common/resource_processing_queue.py @@ -35,21 +35,24 @@ RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER, ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER} -class RouterUpdate(object): - """Encapsulates a router update +class ResourceUpdate(object): + """Encapsulates a resource update An instance of this object carries the information necessary to prioritize - and process a request to update a router. + and process a request to update a resource. + + Priority values are ordered from higher (0) to lower (>0) by the caller, + and are therefore not defined here, but must be done by the consumer. """ - def __init__(self, router_id, priority, - action=None, router=None, timestamp=None, tries=5): + def __init__(self, id, priority, + action=None, resource=None, timestamp=None, tries=5): self.priority = priority self.timestamp = timestamp if not timestamp: self.timestamp = timeutils.utcnow() - self.id = router_id + self.id = id self.action = action - self.router = router + self.resource = resource self.tries = tries def __lt__(self, other): @@ -71,40 +74,40 @@ class RouterUpdate(object): return self.tries < 0 -class ExclusiveRouterProcessor(object): - """Manager for access to a router for processing +class ExclusiveResourceProcessor(object): + """Manager for access to a resource for processing - This class controls access to a router in a non-blocking way. The first - instance to be created for a given router_id is granted exclusive access to - the router. + This class controls access to a resource in a non-blocking way. The first + instance to be created for a given ID is granted exclusive access to + the resource. - Other instances may be created for the same router_id while the first + Other instances may be created for the same ID while the first instance has exclusive access. If that happens then it doesn't block and wait for access. Instead, it signals to the master instance that an update came in with the timestamp. - This way, a thread will not block to wait for access to a router. Instead - it effectively signals to the thread that is working on the router that - something has changed since it started working on it. That thread will - simply finish its current iteration and then repeat. + This way, a thread will not block to wait for access to a resource. + Instead it effectively signals to the thread that is working on the + resource that something has changed since it started working on it. + That thread will simply finish its current iteration and then repeat. - This class keeps track of the last time that a router data was fetched and + This class keeps track of the last time that resource data was fetched and processed. The timestamp that it keeps must be before when the data used - to process the router last was fetched from the database. But, as close as - possible. The timestamp should not be recorded, however, until the router - has been processed using the fetch data. + to process the resource last was fetched from the database. But, as close + as possible. The timestamp should not be recorded, however, until the + resource has been processed using the fetch data. """ _masters = {} - _router_timestamps = {} + _resource_timestamps = {} - def __init__(self, router_id): - self._router_id = router_id + def __init__(self, id): + self._id = id - if router_id not in self._masters: - self._masters[router_id] = self + if id not in self._masters: + self._masters[id] = self self._queue = [] - self._master = self._masters[router_id] + self._master = self._masters[id] def _i_am_master(self): return self == self._master @@ -114,44 +117,44 @@ class ExclusiveRouterProcessor(object): def __exit__(self, type, value, traceback): if self._i_am_master(): - del self._masters[self._router_id] + del self._masters[self._id] - def _get_router_data_timestamp(self): - return self._router_timestamps.get(self._router_id, - datetime.datetime.min) + def _get_resource_data_timestamp(self): + return self._resource_timestamps.get(self._id, + datetime.datetime.min) def fetched_and_processed(self, timestamp): - """Records the data timestamp after it is used to update the router""" - new_timestamp = max(timestamp, self._get_router_data_timestamp()) - self._router_timestamps[self._router_id] = new_timestamp + """Records the timestamp after it is used to update the resource""" + new_timestamp = max(timestamp, self._get_resource_data_timestamp()) + self._resource_timestamps[self._id] = new_timestamp def queue_update(self, update): """Queues an update from a worker - This is the queue used to keep new updates that come in while a router - is being processed. These updates have already bubbled to the front of - the RouterProcessingQueue. + This is the queue used to keep new updates that come in while a + resource is being processed. These updates have already bubbled to + the front of the ResourceProcessingQueue. """ self._master._queue.append(update) def updates(self): - """Processes the router until updates stop coming + """Processes the resource until updates stop coming - Only the master instance will process the router. However, updates may - come in from other workers while it is in progress. This method loops - until they stop coming. + Only the master instance will process the resource. However, updates + may come in from other workers while it is in progress. This method + loops until they stop coming. """ if self._i_am_master(): while self._queue: # Remove the update from the queue even if it is old. update = self._queue.pop(0) # Process the update only if it is fresh. - if self._get_router_data_timestamp() < update.timestamp: + if self._get_resource_data_timestamp() < update.timestamp: yield update -class RouterProcessingQueue(object): - """Manager of the queue of routers to process.""" +class ResourceProcessingQueue(object): + """Manager of the queue of resources to process.""" def __init__(self): self._queue = Queue.PriorityQueue() @@ -159,15 +162,15 @@ class RouterProcessingQueue(object): update.tries -= 1 self._queue.put(update) - def each_update_to_next_router(self): - """Grabs the next router from the queue and processes + def each_update_to_next_resource(self): + """Grabs the next resource from the queue and processes - This method uses a for loop to process the router repeatedly until + This method uses a for loop to process the resource repeatedly until updates stop bubbling to the front of the queue. """ next_update = self._queue.get() - with ExclusiveRouterProcessor(next_update.id) as rp: + with ExclusiveResourceProcessor(next_update.id) as rp: # Queue the update whether this worker is the master or not. rp.queue_update(next_update) diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index 5832e91ae6c..3896d41b679 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -33,6 +33,7 @@ from oslo_utils import timeutils from osprofiler import profiler from neutron._i18n import _ +from neutron.agent.common import resource_processing_queue as queue from neutron.agent.common import utils as common_utils from neutron.agent.l3 import dvr from neutron.agent.l3 import dvr_edge_ha_router @@ -44,7 +45,6 @@ from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api from neutron.agent.l3 import l3_agent_extensions_manager as l3_ext_manager from neutron.agent.l3 import legacy_router from neutron.agent.l3 import namespace_manager -from neutron.agent.l3 import router_processing_queue as queue from neutron.agent.linux import external_process from neutron.agent.linux import ip_lib from neutron.agent.linux import pd @@ -239,7 +239,7 @@ class L3NATAgent(ha.AgentMixin, self.driver, self.metadata_driver) - self._queue = queue.RouterProcessingQueue() + self._queue = queue.ResourceProcessingQueue() super(L3NATAgent, self).__init__(host=self.conf.host) self.target_ex_net_id = None @@ -418,9 +418,9 @@ class L3NATAgent(ha.AgentMixin, def router_deleted(self, context, router_id): """Deal with router deletion RPC message.""" LOG.debug('Got router deleted notification for %s', router_id) - update = queue.RouterUpdate(router_id, - queue.PRIORITY_RPC, - action=queue.DELETE_ROUTER) + update = queue.ResourceUpdate(router_id, + queue.PRIORITY_RPC, + action=queue.DELETE_ROUTER) self._queue.add(update) def routers_updated(self, context, routers): @@ -431,16 +431,16 @@ class L3NATAgent(ha.AgentMixin, if isinstance(routers[0], dict): routers = [router['id'] for router in routers] for id in routers: - update = queue.RouterUpdate( + update = queue.ResourceUpdate( id, queue.PRIORITY_RPC, action=queue.ADD_UPDATE_ROUTER) self._queue.add(update) def router_removed_from_agent(self, context, payload): LOG.debug('Got router removed from agent :%r', payload) router_id = payload['router_id'] - update = queue.RouterUpdate(router_id, - queue.PRIORITY_RPC, - action=queue.DELETE_ROUTER) + update = queue.ResourceUpdate(router_id, + queue.PRIORITY_RPC, + action=queue.DELETE_ROUTER) self._queue.add(update) def router_added_to_agent(self, context, payload): @@ -455,7 +455,7 @@ class L3NATAgent(ha.AgentMixin, ports.append(ri.ex_gw_port) port_belongs = lambda p: p['network_id'] == network_id if any(port_belongs(p) for p in ports): - update = queue.RouterUpdate( + update = queue.ResourceUpdate( ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK) self._resync_router(update) @@ -541,11 +541,11 @@ class L3NATAgent(ha.AgentMixin, return router_update.timestamp = timeutils.utcnow() router_update.priority = priority - router_update.router = None # Force the agent to resync the router + router_update.resource = None # Force the agent to resync the router self._queue.add(router_update) def _process_router_update(self): - for rp, update in self._queue.each_update_to_next_router(): + for rp, update in self._queue.each_update_to_next_resource(): LOG.debug("Starting router update for %s, action %s, priority %s", update.id, update.action, update.priority) if update.action == queue.PD_UPDATE: @@ -553,7 +553,7 @@ class L3NATAgent(ha.AgentMixin, LOG.debug("Finished a router update for %s", update.id) continue - routers = [update.router] if update.router else [] + routers = [update.resource] if update.resource else [] not_delete_no_routers = (update.action != queue.DELETE_ROUTER and not routers) @@ -605,7 +605,7 @@ class L3NATAgent(ha.AgentMixin, # on it already and we don't want to race. new_action = queue.RELATED_ACTION_MAP.get( update.action, queue.ADD_UPDATE_RELATED_ROUTER) - new_update = queue.RouterUpdate( + new_update = queue.ResourceUpdate( router['id'], priority=queue.PRIORITY_RELATED_ROUTER, action=new_action) @@ -697,10 +697,10 @@ class L3NATAgent(ha.AgentMixin, ns_manager.keep_ext_net(ext_net_id) elif is_snat_agent and not r.get('ha'): ns_manager.ensure_snat_cleanup(r['id']) - update = queue.RouterUpdate( + update = queue.ResourceUpdate( r['id'], queue.PRIORITY_SYNC_ROUTERS_TASK, - router=r, + resource=r, action=queue.ADD_UPDATE_ROUTER, timestamp=timestamp) self._queue.add(update) @@ -736,10 +736,10 @@ class L3NATAgent(ha.AgentMixin, # Delete routers that have disappeared since the last sync for router_id in prev_router_ids - curr_router_ids: ns_manager.keep_router(router_id) - update = queue.RouterUpdate(router_id, - queue.PRIORITY_SYNC_ROUTERS_TASK, - timestamp=timestamp, - action=queue.DELETE_ROUTER) + update = queue.ResourceUpdate(router_id, + queue.PRIORITY_SYNC_ROUTERS_TASK, + timestamp=timestamp, + action=queue.DELETE_ROUTER) self._queue.add(update) @property @@ -760,10 +760,10 @@ class L3NATAgent(ha.AgentMixin, def create_pd_router_update(self): router_id = None - update = queue.RouterUpdate(router_id, - queue.PRIORITY_PD_UPDATE, - timestamp=timeutils.utcnow(), - action=queue.PD_UPDATE) + update = queue.ResourceUpdate(router_id, + queue.PRIORITY_PD_UPDATE, + timestamp=timeutils.utcnow(), + action=queue.PD_UPDATE) self._queue.add(update) diff --git a/neutron/tests/unit/agent/l3/test_router_processing_queue.py b/neutron/tests/unit/agent/common/test_resource_processing_queue.py similarity index 53% rename from neutron/tests/unit/agent/l3/test_router_processing_queue.py rename to neutron/tests/unit/agent/common/test_resource_processing_queue.py index 5ae3ad923e3..1a223e9aa2a 100644 --- a/neutron/tests/unit/agent/l3/test_router_processing_queue.py +++ b/neutron/tests/unit/agent/common/test_resource_processing_queue.py @@ -17,21 +17,23 @@ import datetime from oslo_utils import uuidutils -from neutron.agent.l3 import router_processing_queue as l3_queue +from neutron.agent.common import resource_processing_queue as queue from neutron.tests import base _uuid = uuidutils.generate_uuid FAKE_ID = _uuid() FAKE_ID_2 = _uuid() +PRIORITY_RPC = 0 -class TestExclusiveRouterProcessor(base.BaseTestCase): + +class TestExclusiveResourceProcessor(base.BaseTestCase): def test_i_am_master(self): - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) - not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + master = queue.ExclusiveResourceProcessor(FAKE_ID) + not_master = queue.ExclusiveResourceProcessor(FAKE_ID) + master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2) + not_master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2) self.assertTrue(master._i_am_master()) self.assertFalse(not_master._i_am_master()) @@ -42,10 +44,10 @@ class TestExclusiveRouterProcessor(base.BaseTestCase): master_2.__exit__(None, None, None) def test_master(self): - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) - not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2) + master = queue.ExclusiveResourceProcessor(FAKE_ID) + not_master = queue.ExclusiveResourceProcessor(FAKE_ID) + master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2) + not_master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2) self.assertEqual(master, master._master) self.assertEqual(master, not_master._master) @@ -56,56 +58,55 @@ class TestExclusiveRouterProcessor(base.BaseTestCase): master_2.__exit__(None, None, None) def test__enter__(self): - self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters) - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + self.assertNotIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters) + master = queue.ExclusiveResourceProcessor(FAKE_ID) master.__enter__() - self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters) + self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters) master.__exit__(None, None, None) def test__exit__(self): - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master = queue.ExclusiveResourceProcessor(FAKE_ID) + not_master = queue.ExclusiveResourceProcessor(FAKE_ID) master.__enter__() - self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters) + self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters) not_master.__enter__() not_master.__exit__(None, None, None) - self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters) + self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters) master.__exit__(None, None, None) - self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters) + self.assertNotIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters) def test_data_fetched_since(self): - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master = queue.ExclusiveResourceProcessor(FAKE_ID) self.assertEqual(datetime.datetime.min, - master._get_router_data_timestamp()) + master._get_resource_data_timestamp()) ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) ts2 = datetime.datetime.utcnow() master.fetched_and_processed(ts2) - self.assertEqual(ts2, master._get_router_data_timestamp()) + self.assertEqual(ts2, master._get_resource_data_timestamp()) master.fetched_and_processed(ts1) - self.assertEqual(ts2, master._get_router_data_timestamp()) + self.assertEqual(ts2, master._get_resource_data_timestamp()) master.__exit__(None, None, None) def test_updates(self): - master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) - not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID) + master = queue.ExclusiveResourceProcessor(FAKE_ID) + not_master = queue.ExclusiveResourceProcessor(FAKE_ID) - master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0)) - not_master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0)) + master.queue_update(queue.ResourceUpdate(FAKE_ID, 0)) + not_master.queue_update(queue.ResourceUpdate(FAKE_ID, 0)) for update in not_master.updates(): - raise Exception("Only the master should process a router") + raise Exception("Only the master should process a resource") self.assertEqual(2, len([i for i in master.updates()])) def test_hit_retry_limit(self): tries = 1 - queue = l3_queue.RouterProcessingQueue() - update = l3_queue.RouterUpdate(FAKE_ID, l3_queue.PRIORITY_RPC, - tries=tries) - queue.add(update) + rpqueue = queue.ResourceProcessingQueue() + update = queue.ResourceUpdate(FAKE_ID, PRIORITY_RPC, tries=tries) + rpqueue.add(update) self.assertFalse(update.hit_retry_limit()) - queue.add(update) + rpqueue.add(update) self.assertTrue(update.hit_retry_limit()) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index ea7a4c3f613..dbce37325d0 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -32,6 +32,7 @@ from oslo_utils import timeutils from oslo_utils import uuidutils from testtools import matchers +from neutron.agent.common import resource_processing_queue from neutron.agent.l3 import agent as l3_agent from neutron.agent.l3 import dvr_edge_router as dvr_router from neutron.agent.l3 import dvr_router_base @@ -42,7 +43,6 @@ from neutron.agent.l3 import link_local_allocator as lla from neutron.agent.l3 import namespace_manager from neutron.agent.l3 import namespaces from neutron.agent.l3 import router_info as l3router -from neutron.agent.l3 import router_processing_queue from neutron.agent.linux import dibbler from neutron.agent.linux import interface from neutron.agent.linux import iptables_manager @@ -2094,10 +2094,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): router = {'id': _uuid(), 'external_gateway_info': {'network_id': 'aaa'}} self.plugin_api.get_routers.return_value = [router] - update = router_processing_queue.RouterUpdate( + update = resource_processing_queue.ResourceUpdate( router['id'], - router_processing_queue.PRIORITY_SYNC_ROUTERS_TASK, - router=router, + resource_processing_queue.PRIORITY_SYNC_ROUTERS_TASK, + resource=router, timestamp=timeutils.utcnow()) agent._queue.add(update) @@ -2572,8 +2572,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent._resync_router = mock.Mock() update = mock.Mock() update.id = router_id - update.router = None - agent._queue.each_update_to_next_router.side_effect = [ + update.resource = None + agent._queue.each_update_to_next_resource.side_effect = [ [(None, update)]] agent._process_router_update() self.assertFalse(agent.fullsync) @@ -2596,10 +2596,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent._process_router_if_compatible.side_effect = RuntimeError() # Queue an update from a full sync - update = router_processing_queue.RouterUpdate( + update = resource_processing_queue.ResourceUpdate( router_id, - router_processing_queue.PRIORITY_SYNC_ROUTERS_TASK, - router=router, + resource_processing_queue.PRIORITY_SYNC_ROUTERS_TASK, + resource=router, timestamp=timeutils.utcnow()) agent._queue.add(update) agent._process_router_update() @@ -2621,12 +2621,12 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent._queue = mock.Mock() update = mock.Mock() - update.router = None + update.resource = None update.action = 1 # ROUTER_DELETED router_info = mock.MagicMock() agent.router_info[update.id] = router_info router_processor = mock.Mock() - agent._queue.each_update_to_next_router.side_effect = [ + agent._queue.each_update_to_next_resource.side_effect = [ [(router_processor, update)]] agent._resync_router = mock.Mock() if error: @@ -2657,8 +2657,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): related_router = {'id': _uuid()} routers = [router, related_router] self.plugin_api.get_routers.return_value = routers - update = router_processing_queue.RouterUpdate( - router['id'], router_processing_queue.PRIORITY_RPC, router=router) + update = resource_processing_queue.ResourceUpdate( + router['id'], resource_processing_queue.PRIORITY_RPC, + resource=router) events_queue = [] @@ -2680,7 +2681,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): safe_router_removed.assert_not_called() self.assertEqual(1, len(events_queue)) self.assertEqual(related_router['id'], events_queue[0].id) - self.assertEqual(router_processing_queue.PRIORITY_RELATED_ROUTER, + self.assertEqual(resource_processing_queue.PRIORITY_RELATED_ROUTER, events_queue[0].priority) def test_process_routers_if_compatible_router_not_compatible(self): @@ -2688,8 +2689,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): router = {'id': _uuid()} agent.router_info = [router['id']] self.plugin_api.get_routers.return_value = [router] - update = router_processing_queue.RouterUpdate( - router['id'], router_processing_queue.PRIORITY_RPC, router=router) + update = resource_processing_queue.ResourceUpdate( + router['id'], resource_processing_queue.PRIORITY_RPC, + resource=router) with mock.patch.object( agent, "_process_router_if_compatible", @@ -2721,9 +2723,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): 'admin_state_up': True} self.plugin_api.get_routers.return_value = [updated_router] - update = router_processing_queue.RouterUpdate( - updated_router['id'], router_processing_queue.PRIORITY_RPC, - router=updated_router) + update = resource_processing_queue.ResourceUpdate( + updated_router['id'], resource_processing_queue.PRIORITY_RPC, + resource=updated_router) with mock.patch.object(agent, "_safe_router_removed" @@ -2751,9 +2753,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): 'admin_state_up': True} self.plugin_api.get_routers.return_value = [updated_router] - update = router_processing_queue.RouterUpdate( - updated_router['id'], router_processing_queue.PRIORITY_RPC, - router=updated_router) + update = resource_processing_queue.ResourceUpdate( + updated_router['id'], resource_processing_queue.PRIORITY_RPC, + resource=updated_router) with mock.patch.object(agent, "_safe_router_removed" @@ -2769,8 +2771,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = {'id': _uuid()} self.plugin_api.get_routers.return_value = [router] - update = router_processing_queue.RouterUpdate( - router['id'], router_processing_queue.PRIORITY_RPC, router=router) + update = resource_processing_queue.ResourceUpdate( + router['id'], resource_processing_queue.PRIORITY_RPC, + resource=router) with mock.patch.object( agent, "_process_router_if_compatible",