[L3] Use processing queue for network update events
Router_info's _process_internal_ports() method is the one which is
manipulating router_info.internal_ports cache and network_update()
method from the L3 agent is relying on that Router_info's cache to
check if updated network is connected to the router or not.
So they shouldn't be run together as that may cause some race conditions
and unexpected issues, like e.g. described in the related bug.
Until now, network_update event was the only one which was processed
without using queue of events. And because of that such race condition
as described above were possible.
To fix that, this patch changes network_update method in the way that it
now adds update events for each router hosted by agent to the queue.
Those events for single routers are then processed, checks if network is
actually connected to the router and if yes, schedules router update to
be processed.
Conflicts:
neutron/agent/l3/agent.py
Closes-Bug: #1933234
Change-Id: I2efe66a7415f7a18fb85bd2536a1901e751d6203
(cherry picked from commit 6ce48c30bd
)
This commit is contained in:
parent
25f4864d12
commit
604b055c71
|
@ -78,6 +78,7 @@ DELETE_RELATED_ROUTER = 2
|
|||
ADD_UPDATE_ROUTER = 3
|
||||
ADD_UPDATE_RELATED_ROUTER = 4
|
||||
PD_UPDATE = 5
|
||||
UPDATE_NETWORK = 6
|
||||
|
||||
RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
|
||||
|
@ -595,16 +596,26 @@ class L3NATAgent(ha.AgentMixin,
|
|||
network_id = kwargs['network']['id']
|
||||
LOG.debug("Got network %s update", network_id)
|
||||
for ri in self.router_info.values():
|
||||
LOG.debug("Checking if router %s is plugged to the network %s",
|
||||
ri, network_id)
|
||||
ports = list(ri.internal_ports)
|
||||
if ri.ex_gw_port:
|
||||
ports.append(ri.ex_gw_port)
|
||||
port_belongs = lambda p: p['network_id'] == network_id
|
||||
if any(port_belongs(p) for p in ports):
|
||||
update = queue.ResourceUpdate(
|
||||
ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
|
||||
self._resync_router(update)
|
||||
update = queue.ResourceUpdate(ri.router_id,
|
||||
PRIORITY_RPC,
|
||||
action=UPDATE_NETWORK,
|
||||
resource=network_id)
|
||||
self._queue.add(update)
|
||||
|
||||
def _process_network_update(self, router_id, network_id):
|
||||
ri = self.router_info.get(router_id)
|
||||
if not ri:
|
||||
return
|
||||
LOG.debug("Checking if router %s is plugged to the network %s",
|
||||
ri, network_id)
|
||||
ports = list(ri.internal_ports)
|
||||
if ri.ex_gw_port:
|
||||
ports.append(ri.ex_gw_port)
|
||||
port_belongs = lambda p: p['network_id'] == network_id
|
||||
if any(port_belongs(p) for p in ports):
|
||||
update = queue.ResourceUpdate(
|
||||
ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
|
||||
self._resync_router(update)
|
||||
|
||||
def _process_router_if_compatible(self, router):
|
||||
# Either ex_net_id or handle_internal_only_routers must be set
|
||||
|
@ -690,72 +701,85 @@ class L3NATAgent(ha.AgentMixin,
|
|||
router_update.resource = None # Force the agent to resync the router
|
||||
self._queue.add(router_update)
|
||||
|
||||
def _process_router_update(self):
|
||||
def _process_update(self):
|
||||
if self._exiting:
|
||||
return
|
||||
|
||||
for rp, update in self._queue.each_update_to_next_resource():
|
||||
LOG.info("Starting router update for %s, action %s, priority %s, "
|
||||
LOG.info("Starting processing update %s, action %s, priority %s, "
|
||||
"update_id %s. Wait time elapsed: %.3f",
|
||||
update.id, update.action, update.priority,
|
||||
update.update_id,
|
||||
update.time_elapsed_since_create)
|
||||
if update.action == PD_UPDATE:
|
||||
self.pd.process_prefix_update()
|
||||
LOG.info("Finished a router update for %s IPv6 PD, "
|
||||
"update_id. %s. Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
continue
|
||||
if update.action == UPDATE_NETWORK:
|
||||
self._process_network_update(
|
||||
router_id=update.id,
|
||||
network_id=update.resource)
|
||||
else:
|
||||
self._process_router_update(rp, update)
|
||||
|
||||
routers = [update.resource] if update.resource else []
|
||||
def _process_router_update(self, rp, update):
|
||||
LOG.info("Starting router update for %s, action %s, priority %s, "
|
||||
"update_id %s. Wait time elapsed: %.3f",
|
||||
update.id, update.action, update.priority,
|
||||
update.update_id,
|
||||
update.time_elapsed_since_create)
|
||||
if update.action == PD_UPDATE:
|
||||
self.pd.process_prefix_update()
|
||||
LOG.info("Finished a router update for %s IPv6 PD, "
|
||||
"update_id. %s. Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
return
|
||||
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
[update.id])
|
||||
except Exception:
|
||||
msg = "Failed to fetch router information for '%s'"
|
||||
LOG.exception(msg, update.id)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
routers = [update.resource] if update.resource else []
|
||||
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
else:
|
||||
# need to update timestamp of removed router in case
|
||||
# there are older events for the same router in the
|
||||
# processing queue (like events from fullsync) in order to
|
||||
# prevent deleted router re-creation
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
continue
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
[update.id])
|
||||
except Exception:
|
||||
msg = "Failed to fetch router information for '%s'"
|
||||
LOG.exception(msg, update.id)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
return
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
else:
|
||||
# need to update timestamp of removed router in case
|
||||
# there are older events for the same router in the
|
||||
# processing queue (like events from fullsync) in order to
|
||||
# prevent deleted router re-creation
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router delete for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
return
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
self._resync_router(update)
|
||||
return
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
|
||||
def _process_routers_if_compatible(self, routers, update):
|
||||
process_result = True
|
||||
|
@ -804,7 +828,7 @@ class L3NATAgent(ha.AgentMixin,
|
|||
def _process_routers_loop(self):
|
||||
LOG.debug("Starting _process_routers_loop")
|
||||
while not self._exiting:
|
||||
self._pool.spawn_n(self._process_router_update)
|
||||
self._pool.spawn_n(self._process_update)
|
||||
|
||||
# NOTE(kevinbenton): this is set to 1 second because the actual interval
|
||||
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
|
||||
|
|
|
@ -226,7 +226,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
|
||||
# make sure all events are processed
|
||||
while not self.agent._queue._queue.empty():
|
||||
self.agent._process_router_update()
|
||||
self.agent._process_update()
|
||||
|
||||
for r in routers_to_keep:
|
||||
self.assertIn(r['id'], self.agent.router_info)
|
||||
|
|
|
@ -2266,11 +2266,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent._create_router = mock.Mock(return_value=ri)
|
||||
agent._fetch_external_net_id = mock.Mock(
|
||||
return_value=router['external_gateway_info']['network_id'])
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
log_exception.assert_has_calls(calls)
|
||||
|
||||
ri.initialize.side_effect = None
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertTrue(ri.delete.called)
|
||||
self.assertEqual(2, ri.initialize.call_count)
|
||||
self.assertEqual(2, agent._create_router.call_count)
|
||||
|
@ -2577,6 +2577,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
self.assertFalse(agent._queue.add.called)
|
||||
|
||||
def test_network_update(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.router_info = {
|
||||
_uuid(): mock.Mock(),
|
||||
_uuid(): mock.Mock()}
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
network = {'id': network_id}
|
||||
agent.network_update(None, network=network)
|
||||
self.assertEqual(2, agent._queue.add.call_count)
|
||||
|
||||
def test__process_network_update(self):
|
||||
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent._process_added_router(router)
|
||||
|
@ -2585,10 +2596,27 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
internal_ports = ri.router.get(lib_constants.INTERFACE_KEY, [])
|
||||
network_id = internal_ports[0]['network_id']
|
||||
agent._queue = mock.Mock()
|
||||
network = {'id': network_id}
|
||||
agent.network_update(None, network=network)
|
||||
agent._process_network_update(ri.router_id, network_id)
|
||||
self.assertEqual(1, agent._queue.add.call_count)
|
||||
|
||||
def test__process_network_update_no_router_info_found(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
agent._process_network_update(_uuid(), network_id)
|
||||
agent._queue.add.assert_not_called()
|
||||
|
||||
def test__process_network_update_not_connected_to_router(self):
|
||||
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent._process_added_router(router)
|
||||
ri = l3router.RouterInfo(agent, router['id'],
|
||||
router, **self.ri_kwargs)
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
agent._process_network_update(ri.router_id, network_id)
|
||||
agent._queue.add.assert_not_called()
|
||||
|
||||
def test_create_router_namespace(self):
|
||||
self.mock_ip.ensure_namespace.return_value = self.mock_ip
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
|
@ -2724,7 +2752,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
update.resource = None
|
||||
agent._queue.each_update_to_next_resource.side_effect = [
|
||||
[(None, update)]]
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertFalse(agent.fullsync)
|
||||
self.assertEqual(ext_net_call,
|
||||
agent._process_router_if_compatible.called)
|
||||
|
@ -2751,13 +2779,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
resource=router,
|
||||
timestamp=timeutils.utcnow())
|
||||
agent._queue.add(update)
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
|
||||
# The update contained the router object, get_routers won't be called
|
||||
self.assertFalse(agent.plugin_rpc.get_routers.called)
|
||||
|
||||
# The update failed, assert that get_routers was called
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertTrue(agent.plugin_rpc.get_routers.called)
|
||||
|
||||
def test_process_routers_update_rpc_timeout_on_get_ext_net(self):
|
||||
|
@ -2781,7 +2809,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent.plugin_rpc.get_routers.side_effect = (
|
||||
Exception("Failed to get router info"))
|
||||
# start test
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
router_info.delete.assert_not_called()
|
||||
self.assertFalse(router_info.delete.called)
|
||||
self.assertTrue(agent.router_info)
|
||||
|
@ -2804,7 +2832,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent._safe_router_removed = mock.Mock()
|
||||
if error:
|
||||
agent._safe_router_removed.return_value = False
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
if error:
|
||||
self.assertFalse(router_processor.fetched_and_processed.called)
|
||||
agent._resync_router.assert_called_with(update)
|
||||
|
|
Loading…
Reference in New Issue