Rename router processing queue code to be more generic

Moved the router processing queue code to the agent/common
directory and renamed it "resource processing queue".  This
way it can be consumed by other agents, or possibly even
moved to neutron-lib in the future.

Conflicts:
    neutron/agent/common/resource_processing_queue.py
    neutron/agent/l3/agent.py
    neutron/tests/unit/agent/l3/test_agent.py

Change-Id: I735cf5b0a915828c420c3316b78a48f6d54035e6
(cherry picked from commit f24f3b6b7b)
(cherry picked from commit 8f8c899c69)
changes/79/649579/2
Brian Haley 4 years ago committed by Slawek Kaplonski
parent 871b86390a
commit 2ec575f084
  1. 99
      neutron/agent/common/resource_processing_queue.py
  2. 48
      neutron/agent/l3/agent.py
  3. 65
      neutron/tests/unit/agent/common/test_resource_processing_queue.py
  4. 51
      neutron/tests/unit/agent/l3/test_agent.py

@ -35,21 +35,24 @@ RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
class RouterUpdate(object):
"""Encapsulates a router update
class ResourceUpdate(object):
"""Encapsulates a resource update
An instance of this object carries the information necessary to prioritize
and process a request to update a router.
and process a request to update a resource.
Priority values are ordered from higher (0) to lower (>0) by the caller,
and are therefore not defined here, but must be done by the consumer.
"""
def __init__(self, router_id, priority,
action=None, router=None, timestamp=None, tries=5):
def __init__(self, id, priority,
action=None, resource=None, timestamp=None, tries=5):
self.priority = priority
self.timestamp = timestamp
if not timestamp:
self.timestamp = timeutils.utcnow()
self.id = router_id
self.id = id
self.action = action
self.router = router
self.resource = resource
self.tries = tries
def __lt__(self, other):
@ -71,40 +74,40 @@ class RouterUpdate(object):
return self.tries < 0
class ExclusiveRouterProcessor(object):
"""Manager for access to a router for processing
class ExclusiveResourceProcessor(object):
"""Manager for access to a resource for processing
This class controls access to a router in a non-blocking way. The first
instance to be created for a given router_id is granted exclusive access to
the router.
This class controls access to a resource in a non-blocking way. The first
instance to be created for a given ID is granted exclusive access to
the resource.
Other instances may be created for the same router_id while the first
Other instances may be created for the same ID while the first
instance has exclusive access. If that happens then it doesn't block and
wait for access. Instead, it signals to the master instance that an update
came in with the timestamp.
This way, a thread will not block to wait for access to a router. Instead
it effectively signals to the thread that is working on the router that
something has changed since it started working on it. That thread will
simply finish its current iteration and then repeat.
This way, a thread will not block to wait for access to a resource.
Instead it effectively signals to the thread that is working on the
resource that something has changed since it started working on it.
That thread will simply finish its current iteration and then repeat.
This class keeps track of the last time that a router data was fetched and
This class keeps track of the last time that resource data was fetched and
processed. The timestamp that it keeps must be before when the data used
to process the router last was fetched from the database. But, as close as
possible. The timestamp should not be recorded, however, until the router
has been processed using the fetch data.
to process the resource last was fetched from the database. But, as close
as possible. The timestamp should not be recorded, however, until the
resource has been processed using the fetch data.
"""
_masters = {}
_router_timestamps = {}
_resource_timestamps = {}
def __init__(self, router_id):
self._router_id = router_id
def __init__(self, id):
self._id = id
if router_id not in self._masters:
self._masters[router_id] = self
if id not in self._masters:
self._masters[id] = self
self._queue = []
self._master = self._masters[router_id]
self._master = self._masters[id]
def _i_am_master(self):
return self == self._master
@ -114,44 +117,44 @@ class ExclusiveRouterProcessor(object):
def __exit__(self, type, value, traceback):
if self._i_am_master():
del self._masters[self._router_id]
del self._masters[self._id]
def _get_router_data_timestamp(self):
return self._router_timestamps.get(self._router_id,
datetime.datetime.min)
def _get_resource_data_timestamp(self):
return self._resource_timestamps.get(self._id,
datetime.datetime.min)
def fetched_and_processed(self, timestamp):
"""Records the data timestamp after it is used to update the router"""
new_timestamp = max(timestamp, self._get_router_data_timestamp())
self._router_timestamps[self._router_id] = new_timestamp
"""Records the timestamp after it is used to update the resource"""
new_timestamp = max(timestamp, self._get_resource_data_timestamp())
self._resource_timestamps[self._id] = new_timestamp
def queue_update(self, update):
"""Queues an update from a worker
This is the queue used to keep new updates that come in while a router
is being processed. These updates have already bubbled to the front of
the RouterProcessingQueue.
This is the queue used to keep new updates that come in while a
resource is being processed. These updates have already bubbled to
the front of the ResourceProcessingQueue.
"""
self._master._queue.append(update)
def updates(self):
"""Processes the router until updates stop coming
"""Processes the resource until updates stop coming
Only the master instance will process the router. However, updates may
come in from other workers while it is in progress. This method loops
until they stop coming.
Only the master instance will process the resource. However, updates
may come in from other workers while it is in progress. This method
loops until they stop coming.
"""
if self._i_am_master():
while self._queue:
# Remove the update from the queue even if it is old.
update = self._queue.pop(0)
# Process the update only if it is fresh.
if self._get_router_data_timestamp() < update.timestamp:
if self._get_resource_data_timestamp() < update.timestamp:
yield update
class RouterProcessingQueue(object):
"""Manager of the queue of routers to process."""
class ResourceProcessingQueue(object):
"""Manager of the queue of resources to process."""
def __init__(self):
self._queue = Queue.PriorityQueue()
@ -159,15 +162,15 @@ class RouterProcessingQueue(object):
update.tries -= 1
self._queue.put(update)
def each_update_to_next_router(self):
"""Grabs the next router from the queue and processes
def each_update_to_next_resource(self):
"""Grabs the next resource from the queue and processes
This method uses a for loop to process the router repeatedly until
This method uses a for loop to process the resource repeatedly until
updates stop bubbling to the front of the queue.
"""
next_update = self._queue.get()
with ExclusiveRouterProcessor(next_update.id) as rp:
with ExclusiveResourceProcessor(next_update.id) as rp:
# Queue the update whether this worker is the master or not.
rp.queue_update(next_update)

@ -33,6 +33,7 @@ from oslo_utils import timeutils
from osprofiler import profiler
from neutron._i18n import _
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.common import utils as common_utils
from neutron.agent.l3 import dvr
from neutron.agent.l3 import dvr_edge_ha_router
@ -44,7 +45,6 @@ from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import l3_agent_extensions_manager as l3_ext_manager
from neutron.agent.l3 import legacy_router
from neutron.agent.l3 import namespace_manager
from neutron.agent.l3 import router_processing_queue as queue
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import pd
@ -239,7 +239,7 @@ class L3NATAgent(ha.AgentMixin,
self.driver,
self.metadata_driver)
self._queue = queue.RouterProcessingQueue()
self._queue = queue.ResourceProcessingQueue()
super(L3NATAgent, self).__init__(host=self.conf.host)
self.target_ex_net_id = None
@ -418,9 +418,9 @@ class L3NATAgent(ha.AgentMixin,
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug('Got router deleted notification for %s', router_id)
update = queue.RouterUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
self._queue.add(update)
def routers_updated(self, context, routers):
@ -431,16 +431,16 @@ class L3NATAgent(ha.AgentMixin,
if isinstance(routers[0], dict):
routers = [router['id'] for router in routers]
for id in routers:
update = queue.RouterUpdate(
update = queue.ResourceUpdate(
id, queue.PRIORITY_RPC, action=queue.ADD_UPDATE_ROUTER)
self._queue.add(update)
def router_removed_from_agent(self, context, payload):
LOG.debug('Got router removed from agent :%r', payload)
router_id = payload['router_id']
update = queue.RouterUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
self._queue.add(update)
def router_added_to_agent(self, context, payload):
@ -455,7 +455,7 @@ class L3NATAgent(ha.AgentMixin,
ports.append(ri.ex_gw_port)
port_belongs = lambda p: p['network_id'] == network_id
if any(port_belongs(p) for p in ports):
update = queue.RouterUpdate(
update = queue.ResourceUpdate(
ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK)
self._resync_router(update)
@ -541,11 +541,11 @@ class L3NATAgent(ha.AgentMixin,
return
router_update.timestamp = timeutils.utcnow()
router_update.priority = priority
router_update.router = None # Force the agent to resync the router
router_update.resource = None # Force the agent to resync the router
self._queue.add(router_update)
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
for rp, update in self._queue.each_update_to_next_resource():
LOG.debug("Starting router update for %s, action %s, priority %s",
update.id, update.action, update.priority)
if update.action == queue.PD_UPDATE:
@ -553,7 +553,7 @@ class L3NATAgent(ha.AgentMixin,
LOG.debug("Finished a router update for %s", update.id)
continue
routers = [update.router] if update.router else []
routers = [update.resource] if update.resource else []
not_delete_no_routers = (update.action != queue.DELETE_ROUTER and
not routers)
@ -605,7 +605,7 @@ class L3NATAgent(ha.AgentMixin,
# on it already and we don't want to race.
new_action = queue.RELATED_ACTION_MAP.get(
update.action, queue.ADD_UPDATE_RELATED_ROUTER)
new_update = queue.RouterUpdate(
new_update = queue.ResourceUpdate(
router['id'],
priority=queue.PRIORITY_RELATED_ROUTER,
action=new_action)
@ -697,10 +697,10 @@ class L3NATAgent(ha.AgentMixin,
ns_manager.keep_ext_net(ext_net_id)
elif is_snat_agent and not r.get('ha'):
ns_manager.ensure_snat_cleanup(r['id'])
update = queue.RouterUpdate(
update = queue.ResourceUpdate(
r['id'],
queue.PRIORITY_SYNC_ROUTERS_TASK,
router=r,
resource=r,
action=queue.ADD_UPDATE_ROUTER,
timestamp=timestamp)
self._queue.add(update)
@ -736,10 +736,10 @@ class L3NATAgent(ha.AgentMixin,
# Delete routers that have disappeared since the last sync
for router_id in prev_router_ids - curr_router_ids:
ns_manager.keep_router(router_id)
update = queue.RouterUpdate(router_id,
queue.PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=queue.DELETE_ROUTER)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=queue.DELETE_ROUTER)
self._queue.add(update)
@property
@ -760,10 +760,10 @@ class L3NATAgent(ha.AgentMixin,
def create_pd_router_update(self):
router_id = None
update = queue.RouterUpdate(router_id,
queue.PRIORITY_PD_UPDATE,
timestamp=timeutils.utcnow(),
action=queue.PD_UPDATE)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_PD_UPDATE,
timestamp=timeutils.utcnow(),
action=queue.PD_UPDATE)
self._queue.add(update)

@ -17,21 +17,23 @@ import datetime
from oslo_utils import uuidutils
from neutron.agent.l3 import router_processing_queue as l3_queue
from neutron.agent.common import resource_processing_queue as queue
from neutron.tests import base
_uuid = uuidutils.generate_uuid
FAKE_ID = _uuid()
FAKE_ID_2 = _uuid()
PRIORITY_RPC = 0
class TestExclusiveRouterProcessor(base.BaseTestCase):
class TestExclusiveResourceProcessor(base.BaseTestCase):
def test_i_am_master(self):
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
not_master = queue.ExclusiveResourceProcessor(FAKE_ID)
master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2)
not_master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2)
self.assertTrue(master._i_am_master())
self.assertFalse(not_master._i_am_master())
@ -42,10 +44,10 @@ class TestExclusiveRouterProcessor(base.BaseTestCase):
master_2.__exit__(None, None, None)
def test_master(self):
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
not_master = queue.ExclusiveResourceProcessor(FAKE_ID)
master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2)
not_master_2 = queue.ExclusiveResourceProcessor(FAKE_ID_2)
self.assertEqual(master, master._master)
self.assertEqual(master, not_master._master)
@ -56,56 +58,55 @@ class TestExclusiveRouterProcessor(base.BaseTestCase):
master_2.__exit__(None, None, None)
def test__enter__(self):
self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
self.assertNotIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
master.__enter__()
self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters)
master.__exit__(None, None, None)
def test__exit__(self):
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
not_master = queue.ExclusiveResourceProcessor(FAKE_ID)
master.__enter__()
self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters)
not_master.__enter__()
not_master.__exit__(None, None, None)
self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
self.assertIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters)
master.__exit__(None, None, None)
self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
self.assertNotIn(FAKE_ID, queue.ExclusiveResourceProcessor._masters)
def test_data_fetched_since(self):
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
self.assertEqual(datetime.datetime.min,
master._get_router_data_timestamp())
master._get_resource_data_timestamp())
ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
ts2 = datetime.datetime.utcnow()
master.fetched_and_processed(ts2)
self.assertEqual(ts2, master._get_router_data_timestamp())
self.assertEqual(ts2, master._get_resource_data_timestamp())
master.fetched_and_processed(ts1)
self.assertEqual(ts2, master._get_router_data_timestamp())
self.assertEqual(ts2, master._get_resource_data_timestamp())
master.__exit__(None, None, None)
def test_updates(self):
master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
master = queue.ExclusiveResourceProcessor(FAKE_ID)
not_master = queue.ExclusiveResourceProcessor(FAKE_ID)
master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
not_master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
master.queue_update(queue.ResourceUpdate(FAKE_ID, 0))
not_master.queue_update(queue.ResourceUpdate(FAKE_ID, 0))
for update in not_master.updates():
raise Exception("Only the master should process a router")
raise Exception("Only the master should process a resource")
self.assertEqual(2, len([i for i in master.updates()]))
def test_hit_retry_limit(self):
tries = 1
queue = l3_queue.RouterProcessingQueue()
update = l3_queue.RouterUpdate(FAKE_ID, l3_queue.PRIORITY_RPC,
tries=tries)
queue.add(update)
rpqueue = queue.ResourceProcessingQueue()
update = queue.ResourceUpdate(FAKE_ID, PRIORITY_RPC, tries=tries)
rpqueue.add(update)
self.assertFalse(update.hit_retry_limit())
queue.add(update)
rpqueue.add(update)
self.assertTrue(update.hit_retry_limit())

@ -32,6 +32,7 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils
from testtools import matchers
from neutron.agent.common import resource_processing_queue
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import dvr_edge_router as dvr_router
from neutron.agent.l3 import dvr_router_base
@ -42,7 +43,6 @@ from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import namespace_manager
from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_info as l3router
from neutron.agent.l3 import router_processing_queue
from neutron.agent.linux import dibbler
from neutron.agent.linux import interface
from neutron.agent.linux import iptables_manager
@ -2094,10 +2094,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = {'id': _uuid(),
'external_gateway_info': {'network_id': 'aaa'}}
self.plugin_api.get_routers.return_value = [router]
update = router_processing_queue.RouterUpdate(
update = resource_processing_queue.ResourceUpdate(
router['id'],
router_processing_queue.PRIORITY_SYNC_ROUTERS_TASK,
router=router,
resource_processing_queue.PRIORITY_SYNC_ROUTERS_TASK,
resource=router,
timestamp=timeutils.utcnow())
agent._queue.add(update)
@ -2572,8 +2572,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent._resync_router = mock.Mock()
update = mock.Mock()
update.id = router_id
update.router = None
agent._queue.each_update_to_next_router.side_effect = [
update.resource = None
agent._queue.each_update_to_next_resource.side_effect = [
[(None, update)]]
agent._process_router_update()
self.assertFalse(agent.fullsync)
@ -2596,10 +2596,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent._process_router_if_compatible.side_effect = RuntimeError()
# Queue an update from a full sync
update = router_processing_queue.RouterUpdate(
update = resource_processing_queue.ResourceUpdate(
router_id,
router_processing_queue.PRIORITY_SYNC_ROUTERS_TASK,
router=router,
resource_processing_queue.PRIORITY_SYNC_ROUTERS_TASK,
resource=router,
timestamp=timeutils.utcnow())
agent._queue.add(update)
agent._process_router_update()
@ -2621,12 +2621,12 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._queue = mock.Mock()
update = mock.Mock()
update.router = None
update.resource = None
update.action = 1 # ROUTER_DELETED
router_info = mock.MagicMock()
agent.router_info[update.id] = router_info
router_processor = mock.Mock()
agent._queue.each_update_to_next_router.side_effect = [
agent._queue.each_update_to_next_resource.side_effect = [
[(router_processor, update)]]
agent._resync_router = mock.Mock()
if error:
@ -2657,8 +2657,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
related_router = {'id': _uuid()}
routers = [router, related_router]
self.plugin_api.get_routers.return_value = routers
update = router_processing_queue.RouterUpdate(
router['id'], router_processing_queue.PRIORITY_RPC, router=router)
update = resource_processing_queue.ResourceUpdate(
router['id'], resource_processing_queue.PRIORITY_RPC,
resource=router)
events_queue = []
@ -2680,7 +2681,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
safe_router_removed.assert_not_called()
self.assertEqual(1, len(events_queue))
self.assertEqual(related_router['id'], events_queue[0].id)
self.assertEqual(router_processing_queue.PRIORITY_RELATED_ROUTER,
self.assertEqual(resource_processing_queue.PRIORITY_RELATED_ROUTER,
events_queue[0].priority)
def test_process_routers_if_compatible_router_not_compatible(self):
@ -2688,8 +2689,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = {'id': _uuid()}
agent.router_info = [router['id']]
self.plugin_api.get_routers.return_value = [router]
update = router_processing_queue.RouterUpdate(
router['id'], router_processing_queue.PRIORITY_RPC, router=router)
update = resource_processing_queue.ResourceUpdate(
router['id'], resource_processing_queue.PRIORITY_RPC,
resource=router)
with mock.patch.object(
agent, "_process_router_if_compatible",
@ -2721,9 +2723,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
'admin_state_up': True}
self.plugin_api.get_routers.return_value = [updated_router]
update = router_processing_queue.RouterUpdate(
updated_router['id'], router_processing_queue.PRIORITY_RPC,
router=updated_router)
update = resource_processing_queue.ResourceUpdate(
updated_router['id'], resource_processing_queue.PRIORITY_RPC,
resource=updated_router)
with mock.patch.object(agent,
"_safe_router_removed"
@ -2751,9 +2753,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
'admin_state_up': True}
self.plugin_api.get_routers.return_value = [updated_router]
update = router_processing_queue.RouterUpdate(
updated_router['id'], router_processing_queue.PRIORITY_RPC,
router=updated_router)
update = resource_processing_queue.ResourceUpdate(
updated_router['id'], resource_processing_queue.PRIORITY_RPC,
resource=updated_router)
with mock.patch.object(agent,
"_safe_router_removed"
@ -2769,8 +2771,9 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = {'id': _uuid()}
self.plugin_api.get_routers.return_value = [router]
update = router_processing_queue.RouterUpdate(
router['id'], router_processing_queue.PRIORITY_RPC, router=router)
update = resource_processing_queue.ResourceUpdate(
router['id'], resource_processing_queue.PRIORITY_RPC,
resource=router)
with mock.patch.object(
agent, "_process_router_if_compatible",

Loading…
Cancel
Save