L3 agent: paginate sync routers task
In case there are thousands of routers attached to thousands of networks, sync_routers request might take a long time and lead to timeout on agent side, so agent initiate another resync. This may lead to an endless loop causing server overload and agent not being able to sync state. This patch makes l3 agent first check how many routers are assigned to it and then start to fetch routers by chunks. Initial chunk size is set to 256 but may be decreased dynamically in case timeouts happen while waiting response from server. This approach allows to reduce the load on server side and to speed up resync on agent side by starting processing right after receiving the first chunk. Closes-Bug: #1516260 Change-Id: Id675910c2a0b862bfb9e6f4fdaf3cd9fe337e52f
This commit is contained in:
parent
904cdc723b
commit
0e97feb0f3
|
@ -64,6 +64,11 @@ NS_PREFIX = namespaces.NS_PREFIX
|
|||
INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
|
||||
EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
|
||||
|
||||
# Number of routers to fetch from server at a time on resync.
|
||||
# Needed to reduce load on server side and to speed up resync on agent side.
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
|
||||
|
||||
|
||||
class L3PluginApi(object):
|
||||
"""Agent side of the l3 agent RPC API.
|
||||
|
@ -83,6 +88,7 @@ class L3PluginApi(object):
|
|||
1.7 - DVR support: new L3 plugin methods added.
|
||||
- delete_agent_gateway_port
|
||||
1.8 - Added address scope information
|
||||
1.9 - Added get_router_ids
|
||||
"""
|
||||
|
||||
def __init__(self, topic, host):
|
||||
|
@ -96,6 +102,11 @@ class L3PluginApi(object):
|
|||
return cctxt.call(context, 'sync_routers', host=self.host,
|
||||
router_ids=router_ids)
|
||||
|
||||
def get_router_ids(self, context):
|
||||
"""Make a remote process call to retrieve scheduled routers ids."""
|
||||
cctxt = self.client.prepare(version='1.9')
|
||||
return cctxt.call(context, 'get_router_ids', host=self.host)
|
||||
|
||||
def get_external_network_id(self, context):
|
||||
"""Make a remote process call to retrieve the external network id.
|
||||
|
||||
|
@ -188,6 +199,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
self.context = n_context.get_admin_context_without_session()
|
||||
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
|
||||
self.fullsync = True
|
||||
self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
|
||||
|
||||
# Get the list of service plugins from Neutron Server
|
||||
# This is the first place where we contact neutron-server on startup
|
||||
|
@ -532,45 +544,68 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
|
||||
def fetch_and_sync_all_routers(self, context, ns_manager):
|
||||
prev_router_ids = set(self.router_info)
|
||||
curr_router_ids = set()
|
||||
timestamp = timeutils.utcnow()
|
||||
|
||||
try:
|
||||
if self.conf.router_id:
|
||||
routers = self.plugin_rpc.get_routers(context,
|
||||
[self.conf.router_id])
|
||||
router_ids = ([self.conf.router_id] if self.conf.router_id else
|
||||
self.plugin_rpc.get_router_ids(context))
|
||||
# fetch routers by chunks to reduce the load on server and to
|
||||
# start router processing earlier
|
||||
for i in range(0, len(router_ids), self.sync_routers_chunk_size):
|
||||
routers = self.plugin_rpc.get_routers(
|
||||
context, router_ids[i:i + self.sync_routers_chunk_size])
|
||||
LOG.debug('Processing :%r', routers)
|
||||
for r in routers:
|
||||
curr_router_ids.add(r['id'])
|
||||
ns_manager.keep_router(r['id'])
|
||||
if r.get('distributed'):
|
||||
# need to keep fip namespaces as well
|
||||
ext_net_id = (r['external_gateway_info'] or {}).get(
|
||||
'network_id')
|
||||
if ext_net_id:
|
||||
ns_manager.keep_ext_net(ext_net_id)
|
||||
update = queue.RouterUpdate(
|
||||
r['id'],
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
router=r,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
|
||||
self.sync_routers_chunk_size = max(
|
||||
self.sync_routers_chunk_size / 2,
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE)
|
||||
LOG.error(_LE('Server failed to return info for routers in '
|
||||
'required time, decreasing chunk size to: %s'),
|
||||
self.sync_routers_chunk_size)
|
||||
else:
|
||||
routers = self.plugin_rpc.get_routers(context)
|
||||
LOG.error(_LE('Server failed to return info for routers in '
|
||||
'required time even with min chunk size: %s. '
|
||||
'It might be under very high load or '
|
||||
'just inoperable'),
|
||||
self.sync_routers_chunk_size)
|
||||
raise
|
||||
except oslo_messaging.MessagingException:
|
||||
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
|
||||
raise n_exc.AbortSyncRouters()
|
||||
else:
|
||||
LOG.debug('Processing :%r', routers)
|
||||
for r in routers:
|
||||
ns_manager.keep_router(r['id'])
|
||||
if r.get('distributed'):
|
||||
# need to keep fip namespaces as well
|
||||
ext_net_id = (r['external_gateway_info'] or {}).get(
|
||||
'network_id')
|
||||
if ext_net_id:
|
||||
ns_manager.keep_ext_net(ext_net_id)
|
||||
update = queue.RouterUpdate(r['id'],
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
router=r,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
self.fullsync = False
|
||||
LOG.debug("periodic_sync_routers_task successfully completed")
|
||||
|
||||
curr_router_ids = set([r['id'] for r in routers])
|
||||
self.fullsync = False
|
||||
LOG.debug("periodic_sync_routers_task successfully completed")
|
||||
# adjust chunk size after successful sync
|
||||
if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
|
||||
self.sync_routers_chunk_size = min(
|
||||
self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE)
|
||||
|
||||
# Delete routers that have disappeared since the last sync
|
||||
for router_id in prev_router_ids - curr_router_ids:
|
||||
ns_manager.keep_router(router_id)
|
||||
update = queue.RouterUpdate(router_id,
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
timestamp=timestamp,
|
||||
action=queue.DELETE_ROUTER)
|
||||
self._queue.add(update)
|
||||
# Delete routers that have disappeared since the last sync
|
||||
for router_id in prev_router_ids - curr_router_ids:
|
||||
ns_manager.keep_router(router_id)
|
||||
update = queue.RouterUpdate(router_id,
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
timestamp=timestamp,
|
||||
action=queue.DELETE_ROUTER)
|
||||
self._queue.add(update)
|
||||
|
||||
def after_start(self):
|
||||
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
|
||||
|
|
|
@ -46,7 +46,8 @@ class L3RpcCallback(object):
|
|||
# 1.6 Added process_prefix_update to support IPv6 Prefix Delegation
|
||||
# 1.7 Added method delete_agent_gateway_port for DVR Routers
|
||||
# 1.8 Added address scope information
|
||||
target = oslo_messaging.Target(version='1.8')
|
||||
# 1.9 Added get_router_ids
|
||||
target = oslo_messaging.Target(version='1.9')
|
||||
|
||||
@property
|
||||
def plugin(self):
|
||||
|
@ -61,6 +62,10 @@ class L3RpcCallback(object):
|
|||
plugin_constants.L3_ROUTER_NAT]
|
||||
return self._l3plugin
|
||||
|
||||
def get_router_ids(self, context, host):
|
||||
"""Returns IDs of routers scheduled to l3 agent on <host>"""
|
||||
return self.l3plugin.list_router_ids_on_host(context, host)
|
||||
|
||||
@db_api.retry_db_errors
|
||||
def sync_routers(self, context, **kwargs):
|
||||
"""Sync routers according to filters to a specific agent.
|
||||
|
|
|
@ -370,8 +370,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
|
||||
return self.get_sync_data(context, router_ids=router_ids, active=True)
|
||||
|
||||
def list_active_sync_routers_on_active_l3_agent(
|
||||
self, context, host, router_ids):
|
||||
def list_router_ids_on_host(self, context, host, router_ids=None):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
if not agentschedulers_db.services_available(agent.admin_state_up):
|
||||
|
@ -383,8 +382,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
if router_ids:
|
||||
query = query.filter(
|
||||
RouterL3AgentBinding.router_id.in_(router_ids))
|
||||
router_ids = [item[0] for item in query]
|
||||
|
||||
return [item[0] for item in query]
|
||||
|
||||
def list_active_sync_routers_on_active_l3_agent(
|
||||
self, context, host, router_ids):
|
||||
router_ids = self.list_router_ids_on_host(context, host, router_ids)
|
||||
if router_ids:
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
return self._get_active_l3_agent_routers_sync_data(context, host,
|
||||
agent,
|
||||
router_ids)
|
||||
|
|
|
@ -90,6 +90,10 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
deleted_routers_info.append(ri)
|
||||
ns_names_to_retrieve.add(ri.ns_name)
|
||||
|
||||
mocked_get_router_ids = self.mock_plugin_api.get_router_ids
|
||||
mocked_get_router_ids.return_value = [r['id'] for r in
|
||||
routers_to_keep +
|
||||
routers_deleted_during_resync]
|
||||
mocked_get_routers = self.mock_plugin_api.get_routers
|
||||
mocked_get_routers.return_value = (routers_to_keep +
|
||||
routers_deleted_during_resync)
|
||||
|
|
|
@ -200,6 +200,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
|
||||
def test_periodic_sync_routers_task_raise_exception(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.plugin_api.get_router_ids.return_value = ['fake_id']
|
||||
self.plugin_api.get_routers.side_effect = ValueError
|
||||
self.assertRaises(ValueError,
|
||||
agent.periodic_sync_routers_task,
|
||||
|
@ -247,6 +248,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
stale_router_ids = [_uuid(), _uuid()]
|
||||
active_routers = [{'id': _uuid()}, {'id': _uuid()}]
|
||||
self.plugin_api.get_router_ids.return_value = [r['id'] for r
|
||||
in active_routers]
|
||||
self.plugin_api.get_routers.return_value = active_routers
|
||||
namespace_list = [namespaces.NS_PREFIX + r_id
|
||||
for r_id in stale_router_ids]
|
||||
|
|
Loading…
Reference in New Issue