Merge "[L3] Use processing queue for network update events" into stable/victoria
This commit is contained in:
commit
f220fe7176
|
@ -78,6 +78,7 @@ DELETE_RELATED_ROUTER = 2
|
|||
ADD_UPDATE_ROUTER = 3
|
||||
ADD_UPDATE_RELATED_ROUTER = 4
|
||||
PD_UPDATE = 5
|
||||
UPDATE_NETWORK = 6
|
||||
|
||||
RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
|
||||
|
@ -595,16 +596,26 @@ class L3NATAgent(ha.AgentMixin,
|
|||
network_id = kwargs['network']['id']
|
||||
LOG.debug("Got network %s update", network_id)
|
||||
for ri in self.router_info.values():
|
||||
LOG.debug("Checking if router %s is plugged to the network %s",
|
||||
ri, network_id)
|
||||
ports = list(ri.internal_ports)
|
||||
if ri.ex_gw_port:
|
||||
ports.append(ri.ex_gw_port)
|
||||
port_belongs = lambda p: p['network_id'] == network_id
|
||||
if any(port_belongs(p) for p in ports):
|
||||
update = queue.ResourceUpdate(
|
||||
ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
|
||||
self._resync_router(update)
|
||||
update = queue.ResourceUpdate(ri.router_id,
|
||||
PRIORITY_RPC,
|
||||
action=UPDATE_NETWORK,
|
||||
resource=network_id)
|
||||
self._queue.add(update)
|
||||
|
||||
def _process_network_update(self, router_id, network_id):
|
||||
ri = self.router_info.get(router_id)
|
||||
if not ri:
|
||||
return
|
||||
LOG.debug("Checking if router %s is plugged to the network %s",
|
||||
ri, network_id)
|
||||
ports = list(ri.internal_ports)
|
||||
if ri.ex_gw_port:
|
||||
ports.append(ri.ex_gw_port)
|
||||
port_belongs = lambda p: p['network_id'] == network_id
|
||||
if any(port_belongs(p) for p in ports):
|
||||
update = queue.ResourceUpdate(
|
||||
ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
|
||||
self._resync_router(update)
|
||||
|
||||
def _process_router_if_compatible(self, router):
|
||||
# Either ex_net_id or handle_internal_only_routers must be set
|
||||
|
@ -690,72 +701,85 @@ class L3NATAgent(ha.AgentMixin,
|
|||
router_update.resource = None # Force the agent to resync the router
|
||||
self._queue.add(router_update)
|
||||
|
||||
def _process_router_update(self):
|
||||
def _process_update(self):
|
||||
if self._exiting:
|
||||
return
|
||||
|
||||
for rp, update in self._queue.each_update_to_next_resource():
|
||||
LOG.info("Starting router update for %s, action %s, priority %s, "
|
||||
LOG.info("Starting processing update %s, action %s, priority %s, "
|
||||
"update_id %s. Wait time elapsed: %.3f",
|
||||
update.id, update.action, update.priority,
|
||||
update.update_id,
|
||||
update.time_elapsed_since_create)
|
||||
if update.action == PD_UPDATE:
|
||||
self.pd.process_prefix_update()
|
||||
LOG.info("Finished a router update for %s IPv6 PD, "
|
||||
"update_id. %s. Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
continue
|
||||
if update.action == UPDATE_NETWORK:
|
||||
self._process_network_update(
|
||||
router_id=update.id,
|
||||
network_id=update.resource)
|
||||
else:
|
||||
self._process_router_update(rp, update)
|
||||
|
||||
routers = [update.resource] if update.resource else []
|
||||
def _process_router_update(self, rp, update):
|
||||
LOG.info("Starting router update for %s, action %s, priority %s, "
|
||||
"update_id %s. Wait time elapsed: %.3f",
|
||||
update.id, update.action, update.priority,
|
||||
update.update_id,
|
||||
update.time_elapsed_since_create)
|
||||
if update.action == PD_UPDATE:
|
||||
self.pd.process_prefix_update()
|
||||
LOG.info("Finished a router update for %s IPv6 PD, "
|
||||
"update_id. %s. Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
return
|
||||
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
[update.id])
|
||||
except Exception:
|
||||
msg = "Failed to fetch router information for '%s'"
|
||||
LOG.exception(msg, update.id)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
routers = [update.resource] if update.resource else []
|
||||
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
else:
|
||||
# need to update timestamp of removed router in case
|
||||
# there are older events for the same router in the
|
||||
# processing queue (like events from fullsync) in order to
|
||||
# prevent deleted router re-creation
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
continue
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
not_delete_no_routers = (update.action != DELETE_ROUTER and
|
||||
not routers)
|
||||
related_action = update.action in (DELETE_RELATED_ROUTER,
|
||||
ADD_UPDATE_RELATED_ROUTER)
|
||||
if not_delete_no_routers or related_action:
|
||||
try:
|
||||
update.timestamp = timeutils.utcnow()
|
||||
routers = self.plugin_rpc.get_routers(self.context,
|
||||
[update.id])
|
||||
except Exception:
|
||||
msg = "Failed to fetch router information for '%s'"
|
||||
LOG.exception(msg, update.id)
|
||||
self._resync_router(update)
|
||||
continue
|
||||
return
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
# For a related action, verify the router is still hosted here,
|
||||
# since it could have just been deleted and we don't want to
|
||||
# add it back.
|
||||
if related_action:
|
||||
routers = [r for r in routers if r['id'] == update.id]
|
||||
|
||||
if not routers:
|
||||
removed = self._safe_router_removed(update.id)
|
||||
if not removed:
|
||||
self._resync_router(update)
|
||||
else:
|
||||
# need to update timestamp of removed router in case
|
||||
# there are older events for the same router in the
|
||||
# processing queue (like events from fullsync) in order to
|
||||
# prevent deleted router re-creation
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router delete for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
return
|
||||
|
||||
if not self._process_routers_if_compatible(routers, update):
|
||||
self._resync_router(update)
|
||||
return
|
||||
|
||||
rp.fetched_and_processed(update.timestamp)
|
||||
LOG.info("Finished a router update for %s, update_id %s. "
|
||||
"Time elapsed: %.3f",
|
||||
update.id, update.update_id,
|
||||
update.time_elapsed_since_start)
|
||||
|
||||
def _process_routers_if_compatible(self, routers, update):
|
||||
process_result = True
|
||||
|
@ -804,7 +828,7 @@ class L3NATAgent(ha.AgentMixin,
|
|||
def _process_routers_loop(self):
|
||||
LOG.debug("Starting _process_routers_loop")
|
||||
while not self._exiting:
|
||||
self._pool.spawn_n(self._process_router_update)
|
||||
self._pool.spawn_n(self._process_update)
|
||||
|
||||
# NOTE(kevinbenton): this is set to 1 second because the actual interval
|
||||
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
|
||||
|
|
|
@ -226,7 +226,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
|
||||
# make sure all events are processed
|
||||
while not self.agent._queue._queue.empty():
|
||||
self.agent._process_router_update()
|
||||
self.agent._process_update()
|
||||
|
||||
for r in routers_to_keep:
|
||||
self.assertIn(r['id'], self.agent.router_info)
|
||||
|
|
|
@ -2266,11 +2266,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent._create_router = mock.Mock(return_value=ri)
|
||||
agent._fetch_external_net_id = mock.Mock(
|
||||
return_value=router['external_gateway_info']['network_id'])
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
log_exception.assert_has_calls(calls)
|
||||
|
||||
ri.initialize.side_effect = None
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertTrue(ri.delete.called)
|
||||
self.assertEqual(2, ri.initialize.call_count)
|
||||
self.assertEqual(2, agent._create_router.call_count)
|
||||
|
@ -2577,6 +2577,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
self.assertFalse(agent._queue.add.called)
|
||||
|
||||
def test_network_update(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent.router_info = {
|
||||
_uuid(): mock.Mock(),
|
||||
_uuid(): mock.Mock()}
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
network = {'id': network_id}
|
||||
agent.network_update(None, network=network)
|
||||
self.assertEqual(2, agent._queue.add.call_count)
|
||||
|
||||
def test__process_network_update(self):
|
||||
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent._process_added_router(router)
|
||||
|
@ -2585,10 +2596,27 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
internal_ports = ri.router.get(lib_constants.INTERFACE_KEY, [])
|
||||
network_id = internal_ports[0]['network_id']
|
||||
agent._queue = mock.Mock()
|
||||
network = {'id': network_id}
|
||||
agent.network_update(None, network=network)
|
||||
agent._process_network_update(ri.router_id, network_id)
|
||||
self.assertEqual(1, agent._queue.add.call_count)
|
||||
|
||||
def test__process_network_update_no_router_info_found(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
agent._process_network_update(_uuid(), network_id)
|
||||
agent._queue.add.assert_not_called()
|
||||
|
||||
def test__process_network_update_not_connected_to_router(self):
|
||||
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent._process_added_router(router)
|
||||
ri = l3router.RouterInfo(agent, router['id'],
|
||||
router, **self.ri_kwargs)
|
||||
network_id = _uuid()
|
||||
agent._queue = mock.Mock()
|
||||
agent._process_network_update(ri.router_id, network_id)
|
||||
agent._queue.add.assert_not_called()
|
||||
|
||||
def test_create_router_namespace(self):
|
||||
self.mock_ip.ensure_namespace.return_value = self.mock_ip
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
|
@ -2720,7 +2748,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
update.resource = None
|
||||
agent._queue.each_update_to_next_resource.side_effect = [
|
||||
[(None, update)]]
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertFalse(agent.fullsync)
|
||||
self.assertEqual(ext_net_call,
|
||||
agent._process_router_if_compatible.called)
|
||||
|
@ -2747,13 +2775,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
resource=router,
|
||||
timestamp=timeutils.utcnow())
|
||||
agent._queue.add(update)
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
|
||||
# The update contained the router object, get_routers won't be called
|
||||
self.assertFalse(agent.plugin_rpc.get_routers.called)
|
||||
|
||||
# The update failed, assert that get_routers was called
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
self.assertTrue(agent.plugin_rpc.get_routers.called)
|
||||
|
||||
def test_process_routers_update_rpc_timeout_on_get_ext_net(self):
|
||||
|
@ -2777,7 +2805,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent.plugin_rpc.get_routers.side_effect = (
|
||||
Exception("Failed to get router info"))
|
||||
# start test
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
router_info.delete.assert_not_called()
|
||||
self.assertFalse(router_info.delete.called)
|
||||
self.assertTrue(agent.router_info)
|
||||
|
@ -2800,7 +2828,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent._safe_router_removed = mock.Mock()
|
||||
if error:
|
||||
agent._safe_router_removed.return_value = False
|
||||
agent._process_router_update()
|
||||
agent._process_update()
|
||||
if error:
|
||||
self.assertFalse(router_processor.fetched_and_processed.called)
|
||||
agent._resync_router.assert_called_with(update)
|
||||
|
|
Loading…
Reference in New Issue