Merge "L3 agent: paginate sync routers task"
This commit is contained in:
commit
ea8cafdfc0
|
@ -64,6 +64,11 @@ NS_PREFIX = namespaces.NS_PREFIX
|
|||
INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
|
||||
EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
|
||||
|
||||
# Number of routers to fetch from server at a time on resync.
|
||||
# Needed to reduce load on server side and to speed up resync on agent side.
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
|
||||
|
||||
|
||||
class L3PluginApi(object):
|
||||
"""Agent side of the l3 agent RPC API.
|
||||
|
@ -83,6 +88,7 @@ class L3PluginApi(object):
|
|||
1.7 - DVR support: new L3 plugin methods added.
|
||||
- delete_agent_gateway_port
|
||||
1.8 - Added address scope information
|
||||
1.9 - Added get_router_ids
|
||||
"""
|
||||
|
||||
def __init__(self, topic, host):
|
||||
|
@ -96,6 +102,11 @@ class L3PluginApi(object):
|
|||
return cctxt.call(context, 'sync_routers', host=self.host,
|
||||
router_ids=router_ids)
|
||||
|
||||
def get_router_ids(self, context):
|
||||
"""Make a remote process call to retrieve scheduled routers ids."""
|
||||
cctxt = self.client.prepare(version='1.9')
|
||||
return cctxt.call(context, 'get_router_ids', host=self.host)
|
||||
|
||||
def get_external_network_id(self, context):
|
||||
"""Make a remote process call to retrieve the external network id.
|
||||
|
||||
|
@ -188,6 +199,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
self.context = n_context.get_admin_context_without_session()
|
||||
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
|
||||
self.fullsync = True
|
||||
self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
|
||||
|
||||
# Get the list of service plugins from Neutron Server
|
||||
# This is the first place where we contact neutron-server on startup
|
||||
|
@ -532,45 +544,68 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
|
||||
def fetch_and_sync_all_routers(self, context, ns_manager):
|
||||
prev_router_ids = set(self.router_info)
|
||||
curr_router_ids = set()
|
||||
timestamp = timeutils.utcnow()
|
||||
|
||||
try:
|
||||
if self.conf.router_id:
|
||||
routers = self.plugin_rpc.get_routers(context,
|
||||
[self.conf.router_id])
|
||||
router_ids = ([self.conf.router_id] if self.conf.router_id else
|
||||
self.plugin_rpc.get_router_ids(context))
|
||||
# fetch routers by chunks to reduce the load on server and to
|
||||
# start router processing earlier
|
||||
for i in range(0, len(router_ids), self.sync_routers_chunk_size):
|
||||
routers = self.plugin_rpc.get_routers(
|
||||
context, router_ids[i:i + self.sync_routers_chunk_size])
|
||||
LOG.debug('Processing :%r', routers)
|
||||
for r in routers:
|
||||
curr_router_ids.add(r['id'])
|
||||
ns_manager.keep_router(r['id'])
|
||||
if r.get('distributed'):
|
||||
# need to keep fip namespaces as well
|
||||
ext_net_id = (r['external_gateway_info'] or {}).get(
|
||||
'network_id')
|
||||
if ext_net_id:
|
||||
ns_manager.keep_ext_net(ext_net_id)
|
||||
update = queue.RouterUpdate(
|
||||
r['id'],
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
router=r,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
|
||||
self.sync_routers_chunk_size = max(
|
||||
self.sync_routers_chunk_size / 2,
|
||||
SYNC_ROUTERS_MIN_CHUNK_SIZE)
|
||||
LOG.error(_LE('Server failed to return info for routers in '
|
||||
'required time, decreasing chunk size to: %s'),
|
||||
self.sync_routers_chunk_size)
|
||||
else:
|
||||
routers = self.plugin_rpc.get_routers(context)
|
||||
LOG.error(_LE('Server failed to return info for routers in '
|
||||
'required time even with min chunk size: %s. '
|
||||
'It might be under very high load or '
|
||||
'just inoperable'),
|
||||
self.sync_routers_chunk_size)
|
||||
raise
|
||||
except oslo_messaging.MessagingException:
|
||||
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
|
||||
raise n_exc.AbortSyncRouters()
|
||||
else:
|
||||
LOG.debug('Processing :%r', routers)
|
||||
for r in routers:
|
||||
ns_manager.keep_router(r['id'])
|
||||
if r.get('distributed'):
|
||||
# need to keep fip namespaces as well
|
||||
ext_net_id = (r['external_gateway_info'] or {}).get(
|
||||
'network_id')
|
||||
if ext_net_id:
|
||||
ns_manager.keep_ext_net(ext_net_id)
|
||||
update = queue.RouterUpdate(r['id'],
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
router=r,
|
||||
timestamp=timestamp)
|
||||
self._queue.add(update)
|
||||
self.fullsync = False
|
||||
LOG.debug("periodic_sync_routers_task successfully completed")
|
||||
|
||||
curr_router_ids = set([r['id'] for r in routers])
|
||||
self.fullsync = False
|
||||
LOG.debug("periodic_sync_routers_task successfully completed")
|
||||
# adjust chunk size after successful sync
|
||||
if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
|
||||
self.sync_routers_chunk_size = min(
|
||||
self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
|
||||
SYNC_ROUTERS_MAX_CHUNK_SIZE)
|
||||
|
||||
# Delete routers that have disappeared since the last sync
|
||||
for router_id in prev_router_ids - curr_router_ids:
|
||||
ns_manager.keep_router(router_id)
|
||||
update = queue.RouterUpdate(router_id,
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
timestamp=timestamp,
|
||||
action=queue.DELETE_ROUTER)
|
||||
self._queue.add(update)
|
||||
# Delete routers that have disappeared since the last sync
|
||||
for router_id in prev_router_ids - curr_router_ids:
|
||||
ns_manager.keep_router(router_id)
|
||||
update = queue.RouterUpdate(router_id,
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
timestamp=timestamp,
|
||||
action=queue.DELETE_ROUTER)
|
||||
self._queue.add(update)
|
||||
|
||||
def after_start(self):
|
||||
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
|
||||
|
|
|
@ -46,7 +46,8 @@ class L3RpcCallback(object):
|
|||
# 1.6 Added process_prefix_update to support IPv6 Prefix Delegation
|
||||
# 1.7 Added method delete_agent_gateway_port for DVR Routers
|
||||
# 1.8 Added address scope information
|
||||
target = oslo_messaging.Target(version='1.8')
|
||||
# 1.9 Added get_router_ids
|
||||
target = oslo_messaging.Target(version='1.9')
|
||||
|
||||
@property
|
||||
def plugin(self):
|
||||
|
@ -61,6 +62,10 @@ class L3RpcCallback(object):
|
|||
plugin_constants.L3_ROUTER_NAT]
|
||||
return self._l3plugin
|
||||
|
||||
def get_router_ids(self, context, host):
|
||||
"""Returns IDs of routers scheduled to l3 agent on <host>"""
|
||||
return self.l3plugin.list_router_ids_on_host(context, host)
|
||||
|
||||
@db_api.retry_db_errors
|
||||
def sync_routers(self, context, **kwargs):
|
||||
"""Sync routers according to filters to a specific agent.
|
||||
|
|
|
@ -361,8 +361,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
|
||||
return self.get_sync_data(context, router_ids=router_ids, active=True)
|
||||
|
||||
def list_active_sync_routers_on_active_l3_agent(
|
||||
self, context, host, router_ids):
|
||||
def list_router_ids_on_host(self, context, host, router_ids=None):
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
if not agentschedulers_db.services_available(agent.admin_state_up):
|
||||
|
@ -374,8 +373,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
if router_ids:
|
||||
query = query.filter(
|
||||
RouterL3AgentBinding.router_id.in_(router_ids))
|
||||
router_ids = [item[0] for item in query]
|
||||
|
||||
return [item[0] for item in query]
|
||||
|
||||
def list_active_sync_routers_on_active_l3_agent(
|
||||
self, context, host, router_ids):
|
||||
router_ids = self.list_router_ids_on_host(context, host, router_ids)
|
||||
if router_ids:
|
||||
agent = self._get_agent_by_type_and_host(
|
||||
context, constants.AGENT_TYPE_L3, host)
|
||||
return self._get_active_l3_agent_routers_sync_data(context, host,
|
||||
agent,
|
||||
router_ids)
|
||||
|
|
|
@ -112,6 +112,10 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
deleted_routers_info.append(ri)
|
||||
ns_names_to_retrieve.add(ri.ns_name)
|
||||
|
||||
mocked_get_router_ids = self.mock_plugin_api.get_router_ids
|
||||
mocked_get_router_ids.return_value = [r['id'] for r in
|
||||
routers_to_keep +
|
||||
routers_deleted_during_resync]
|
||||
mocked_get_routers = self.mock_plugin_api.get_routers
|
||||
mocked_get_routers.return_value = (routers_to_keep +
|
||||
routers_deleted_during_resync)
|
||||
|
|
|
@ -200,6 +200,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
|
||||
def test_periodic_sync_routers_task_raise_exception(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.plugin_api.get_router_ids.return_value = ['fake_id']
|
||||
self.plugin_api.get_routers.side_effect = ValueError
|
||||
self.assertRaises(ValueError,
|
||||
agent.periodic_sync_routers_task,
|
||||
|
@ -247,6 +248,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
stale_router_ids = [_uuid(), _uuid()]
|
||||
active_routers = [{'id': _uuid()}, {'id': _uuid()}]
|
||||
self.plugin_api.get_router_ids.return_value = [r['id'] for r
|
||||
in active_routers]
|
||||
self.plugin_api.get_routers.return_value = active_routers
|
||||
namespace_list = [namespaces.NS_PREFIX + r_id
|
||||
for r_id in stale_router_ids]
|
||||
|
|
Loading…
Reference in New Issue