neutron/neutron/agent/l3/agent.py

878 lines
38 KiB
Python

# Copyright 2012 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import eventlet
import netaddr
from neutron_lib.agent import constants as agent_consts
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_const
from neutron_lib import context as n_context
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_context import context as common_context
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_utils import excutils
from oslo_utils import timeutils
from osprofiler import profiler
from neutron._i18n import _
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.common import utils as common_utils
from neutron.agent.l3 import dvr
from neutron.agent.l3 import dvr_edge_ha_router
from neutron.agent.l3 import dvr_edge_router as dvr_router
from neutron.agent.l3 import dvr_local_router
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import l3_agent_extensions_manager as l3_ext_manager
from neutron.agent.l3 import legacy_router
from neutron.agent.l3 import namespace_manager
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import pd
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import manager
LOG = logging.getLogger(__name__)
# Number of routers to fetch from server at a time on resync.
# Needed to reduce load on server side and to speed up resync on agent side.
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
ROUTER_PROCESS_GREENLET_MAX = 32
ROUTER_PROCESS_GREENLET_MIN = 8
def log_verbose_exc(message, router_payload):
LOG.exception(message)
LOG.debug("Payload:\n%s",
utils.DelayedStringRenderer(jsonutils.dumps,
router_payload, indent=5))
class L3PluginApi(object):
"""Agent side of the l3 agent RPC API.
API version history:
1.0 - Initial version.
1.1 - Floating IP operational status updates
1.2 - DVR support: new L3 plugin methods added.
- get_ports_by_subnet
- get_agent_gateway_port
Needed by the agent when operating in DVR/DVR_SNAT mode
1.3 - Get the list of activated services
1.4 - Added L3 HA update_router_state. This method was reworked in
to update_ha_routers_states
1.5 - Added update_ha_routers_states
1.6 - Added process_prefix_update
1.7 - DVR support: new L3 plugin methods added.
- delete_agent_gateway_port
1.8 - Added address scope information
1.9 - Added get_router_ids
1.10 Added update_all_ha_network_port_statuses
"""
def __init__(self, topic, host):
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_routers(self, context, router_ids=None):
"""Make a remote process call to retrieve the sync data for routers."""
cctxt = self.client.prepare()
return cctxt.call(context, 'sync_routers', host=self.host,
router_ids=router_ids)
def update_all_ha_network_port_statuses(self, context):
"""Make a remote process call to update HA network port status."""
cctxt = self.client.prepare(version='1.10')
return cctxt.call(context, 'update_all_ha_network_port_statuses',
host=self.host)
def get_router_ids(self, context):
"""Make a remote process call to retrieve scheduled routers ids."""
cctxt = self.client.prepare(version='1.9')
return cctxt.call(context, 'get_router_ids', host=self.host)
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
@raise oslo_messaging.RemoteError: with TooManyExternalNetworks as
exc_type if there are more than one
external network
"""
cctxt = self.client.prepare()
return cctxt.call(context, 'get_external_network_id', host=self.host)
def update_floatingip_statuses(self, context, router_id, fip_statuses):
"""Call the plugin update floating IPs's operational status."""
cctxt = self.client.prepare(version='1.1')
return cctxt.call(context, 'update_floatingip_statuses',
router_id=router_id, fip_statuses=fip_statuses)
def get_ports_by_subnet(self, context, subnet_id):
"""Retrieve ports by subnet id."""
cctxt = self.client.prepare(version='1.2')
return cctxt.call(context, 'get_ports_by_subnet', host=self.host,
subnet_id=subnet_id)
def get_agent_gateway_port(self, context, fip_net):
"""Get or create an agent_gateway_port."""
cctxt = self.client.prepare(version='1.2')
return cctxt.call(context, 'get_agent_gateway_port',
network_id=fip_net, host=self.host)
def get_service_plugin_list(self, context):
"""Make a call to get the list of activated services."""
cctxt = self.client.prepare(version='1.3')
return cctxt.call(context, 'get_service_plugin_list')
def update_ha_routers_states(self, context, states):
"""Update HA routers states."""
cctxt = self.client.prepare(version='1.5')
return cctxt.cast(context, 'update_ha_routers_states',
host=self.host, states=states)
def process_prefix_update(self, context, prefix_update):
"""Process prefix update whenever prefixes get changed."""
cctxt = self.client.prepare(version='1.6')
return cctxt.call(context, 'process_prefix_update',
subnets=prefix_update)
def delete_agent_gateway_port(self, context, fip_net):
"""Delete Floatingip_agent_gateway_port."""
cctxt = self.client.prepare(version='1.7')
return cctxt.call(context, 'delete_agent_gateway_port',
host=self.host, network_id=fip_net)
@profiler.trace_cls("l3-agent")
class L3NATAgent(ha.AgentMixin,
dvr.AgentMixin,
manager.Manager):
"""Manager for L3NatAgent
API version history:
1.0 initial Version
1.1 changed the type of the routers parameter
to the routers_updated method.
It was previously a list of routers in dict format.
It is now a list of router IDs only.
Per rpc versioning rules, it is backwards compatible.
1.2 - DVR support: new L3 agent methods added.
- add_arp_entry
- del_arp_entry
1.3 - fipnamespace_delete_on_ext_net - to delete fipnamespace
after the external network is removed
Needed by the L3 service when dealing with DVR
1.4 - support network_update to get MTU updates
"""
target = oslo_messaging.Target(version='1.4')
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.router_info = {}
self._check_config_params()
self.process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='router')
self.driver = common_utils.load_interface_driver(self.conf)
self._context = n_context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
# Get the list of service plugins from Neutron Server
# This is the first place where we contact neutron-server on startup
# so retry in case its not ready to respond.
while True:
try:
self.neutron_service_plugins = (
self.plugin_rpc.get_service_plugin_list(self.context))
except oslo_messaging.MessagingTimeout as e:
LOG.warning('l3-agent cannot contact neutron server '
'to retrieve service plugins enabled. '
'Check connectivity to neutron server. '
'Retrying... '
'Detailed message: %(msg)s.', {'msg': e})
continue
break
self.init_extension_manager(self.plugin_rpc)
self.metadata_driver = None
if self.conf.enable_metadata_proxy:
self.metadata_driver = metadata_driver.MetadataDriver(self)
self.namespaces_manager = namespace_manager.NamespaceManager(
self.conf,
self.driver,
self.metadata_driver)
# L3 agent router processing green pool
self._pool = eventlet.GreenPool(size=ROUTER_PROCESS_GREENLET_MIN)
self._queue = queue.ResourceProcessingQueue()
super(L3NATAgent, self).__init__(host=self.conf.host)
self.target_ex_net_id = None
self.use_ipv6 = ipv6_utils.is_enabled_and_bind_by_default()
self.pd = pd.PrefixDelegation(self.context, self.process_monitor,
self.driver,
self.plugin_rpc.process_prefix_update,
self.create_pd_router_update,
self.conf)
# Consume network updates to trigger router resync
consumers = [[topics.NETWORK, topics.UPDATE]]
agent_rpc.create_consumers([self], topics.AGENT, consumers)
# We set HA network port status to DOWN to let l2 agent update it
# to ACTIVE after wiring. This allows us to spawn keepalived only
# when l2 agent finished wiring the port.
try:
self.plugin_rpc.update_all_ha_network_port_statuses(self.context)
except Exception:
LOG.exception('update_all_ha_network_port_statuses failed')
def _check_config_params(self):
"""Check items in configuration files.
Check for required and invalid configuration items.
The actual values are not verified for correctness.
"""
if not self.conf.interface_driver:
msg = 'An interface driver must be specified'
LOG.error(msg)
raise SystemExit(1)
if self.conf.ipv6_gateway:
# ipv6_gateway configured. Check for valid v6 link-local address.
try:
msg = ("%s used in config as ipv6_gateway is not a valid "
"IPv6 link-local address.")
ip_addr = netaddr.IPAddress(self.conf.ipv6_gateway)
if ip_addr.version != 6 or not ip_addr.is_link_local():
LOG.error(msg, self.conf.ipv6_gateway)
raise SystemExit(1)
except netaddr.AddrFormatError:
LOG.error(msg, self.conf.ipv6_gateway)
raise SystemExit(1)
def _fetch_external_net_id(self, force=False):
"""Find UUID of single external network for this agent."""
if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id
# L3 agent doesn't use external_network_bridge to handle external
# networks, so bridge_mappings with provider networks will be used
# and the L3 agent is able to handle any external networks.
if not self.conf.external_network_bridge:
return
if not force and self.target_ex_net_id:
return self.target_ex_net_id
try:
self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
self.context)
return self.target_ex_net_id
except oslo_messaging.RemoteError as e:
with excutils.save_and_reraise_exception() as ctx:
if e.exc_type == 'TooManyExternalNetworks':
ctx.reraise = False
msg = _(
"The 'gateway_external_network_id' option must be "
"configured for this agent as Neutron has more than "
"one external network.")
raise Exception(msg)
def _create_router(self, router_id, router):
args = []
kwargs = {
'agent': self,
'router_id': router_id,
'router': router,
'use_ipv6': self.use_ipv6,
'agent_conf': self.conf,
'interface_driver': self.driver,
}
if router.get('distributed'):
kwargs['host'] = self.host
if router.get('distributed') and router.get('ha'):
# Case 1: If the router contains information about the HA interface
# and if the requesting agent is a DVR_SNAT agent then go ahead
# and create a HA router.
# Case 2: If the router does not contain information about the HA
# interface this means that this DVR+HA router needs to host only
# the edge side of it, typically because it's landing on a node
# that needs to provision a router namespace because of a DVR
# service port (e.g. DHCP). So go ahead and create a regular DVR
# edge router.
if (self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT and
router.get(lib_const.HA_INTERFACE_KEY) is not None):
kwargs['state_change_callback'] = self.enqueue_state_change
return dvr_edge_ha_router.DvrEdgeHaRouter(*args, **kwargs)
if router.get('distributed'):
if self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT:
return dvr_router.DvrEdgeRouter(*args, **kwargs)
else:
return dvr_local_router.DvrLocalRouter(*args, **kwargs)
if router.get('ha'):
kwargs['state_change_callback'] = self.enqueue_state_change
return ha_router.HaRouter(*args, **kwargs)
return legacy_router.LegacyRouter(*args, **kwargs)
@lockutils.synchronized('resize_greenpool')
def _resize_process_pool(self):
self._pool_size = max([ROUTER_PROCESS_GREENLET_MIN,
min([ROUTER_PROCESS_GREENLET_MAX,
len(self.router_info)])])
LOG.info("Resizing router processing queue green pool size to: %d",
self._pool_size)
self._pool.resize(self._pool_size)
def _router_added(self, router_id, router):
ri = self._create_router(router_id, router)
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
self, router=ri)
self.router_info[router_id] = ri
# If initialize() fails, cleanup and retrigger complete sync
try:
ri.initialize(self.process_monitor)
except Exception:
with excutils.save_and_reraise_exception():
del self.router_info[router_id]
LOG.exception('Error while initializing router %s',
router_id)
self.namespaces_manager.ensure_router_cleanup(router_id)
try:
ri.delete()
except Exception:
LOG.exception('Error while deleting router %s',
router_id)
self._resize_process_pool()
def _safe_router_removed(self, router_id):
"""Try to delete a router and return True if successful."""
try:
self._router_removed(router_id)
self.l3_ext_manager.delete_router(self.context, router_id)
except Exception:
LOG.exception('Error while deleting router %s', router_id)
return False
else:
return True
def _router_removed(self, router_id):
"""Delete the router and stop the auxiliary processes
This stops the auxiliary processes (keepalived, keepvalived-state-
change, radvd, etc) and deletes the router ports and the namespace.
The "router_info" cache is updated too at the beginning of the process,
to avoid any other concurrent process to handle the router being
deleted. If an exception is raised, the "router_info" cache is
restored.
"""
ri = self.router_info.get(router_id)
if ri is None:
LOG.warning("Info for router %s was not found. "
"Performing router cleanup", router_id)
self.namespaces_manager.ensure_router_cleanup(router_id)
return
registry.notify(resources.ROUTER, events.BEFORE_DELETE,
self, router=ri)
del self.router_info[router_id]
try:
ri.delete()
except Exception:
with excutils.save_and_reraise_exception():
self.router_info[router_id] = ri
registry.notify(resources.ROUTER, events.AFTER_DELETE, self, router=ri)
self._resize_process_pool()
def init_extension_manager(self, connection):
l3_ext_manager.register_opts(self.conf)
self.agent_api = l3_ext_api.L3AgentExtensionAPI(self.router_info)
self.l3_ext_manager = (
l3_ext_manager.L3AgentExtensionsManager(self.conf))
self.l3_ext_manager.initialize(
connection, lib_const.L3_AGENT_MODE,
self.agent_api)
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug('Got router deleted notification for %s', router_id)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
self._queue.add(update)
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
LOG.debug('Got routers updated notification :%s', routers)
if routers:
# This is needed for backward compatibility
if isinstance(routers[0], dict):
routers = [router['id'] for router in routers]
for id in routers:
update = queue.ResourceUpdate(
id, queue.PRIORITY_RPC, action=queue.ADD_UPDATE_ROUTER)
self._queue.add(update)
def router_removed_from_agent(self, context, payload):
LOG.debug('Got router removed from agent :%r', payload)
router_id = payload['router_id']
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_RPC,
action=queue.DELETE_ROUTER)
self._queue.add(update)
def router_added_to_agent(self, context, payload):
LOG.debug('Got router added to agent :%r', payload)
self.routers_updated(context, payload)
def network_update(self, context, **kwargs):
network_id = kwargs['network']['id']
for ri in self.router_info.values():
ports = list(ri.internal_ports)
if ri.ex_gw_port:
ports.append(ri.ex_gw_port)
port_belongs = lambda p: p['network_id'] == network_id
if any(port_belongs(p) for p in ports):
update = queue.ResourceUpdate(
ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK)
self._resync_router(update)
def _process_router_if_compatible(self, router):
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error("The external network bridge '%s' does not exist",
self.conf.external_network_bridge)
return
# Either ex_net_id or handle_internal_only_routers must be set
ex_net_id = (router['external_gateway_info'] or {}).get('network_id')
if not ex_net_id and not self.conf.handle_internal_only_routers:
raise n_exc.RouterNotCompatibleWithAgent(router_id=router['id'])
# If target_ex_net_id and ex_net_id are set they must be equal
target_ex_net_id = self._fetch_external_net_id()
if (target_ex_net_id and ex_net_id and ex_net_id != target_ex_net_id):
# Double check that our single external_net_id has not changed
# by forcing a check by RPC.
if ex_net_id != self._fetch_external_net_id(force=True):
raise n_exc.RouterNotCompatibleWithAgent(
router_id=router['id'])
if router['id'] not in self.router_info:
self._process_added_router(router)
else:
self._process_updated_router(router)
def _process_added_router(self, router):
self._router_added(router['id'], router)
ri = self.router_info[router['id']]
ri.router = router
ri.process()
registry.notify(resources.ROUTER, events.AFTER_CREATE, self, router=ri)
self.l3_ext_manager.add_router(self.context, router)
def _process_updated_router(self, router):
ri = self.router_info[router['id']]
is_dvr_snat_agent = (self.conf.agent_mode ==
lib_const.L3_AGENT_MODE_DVR_SNAT)
is_dvr_only_agent = (self.conf.agent_mode in
[lib_const.L3_AGENT_MODE_DVR,
lib_const.L3_AGENT_MODE_DVR_NO_EXTERNAL])
old_router_ha_interface = ri.router.get(lib_const.HA_INTERFACE_KEY)
current_router_ha_interface = router.get(lib_const.HA_INTERFACE_KEY)
ha_interface_change = ((old_router_ha_interface is None and
current_router_ha_interface is not None) or
(old_router_ha_interface is not None and
current_router_ha_interface is None))
is_dvr_ha_router = router.get('distributed') and router.get('ha')
if is_dvr_snat_agent and is_dvr_ha_router and ha_interface_change:
LOG.debug("Removing HA router %s, since it is not bound to "
"the current agent, and recreating regular DVR router "
"based on service port requirements.",
router['id'])
if self._safe_router_removed(router['id']):
self._process_added_router(router)
else:
is_ha_router = getattr(ri, 'ha_state', False)
# For HA routers check that DB state matches actual state
if router.get('ha') and not is_dvr_only_agent and is_ha_router:
self.check_ha_state_for_router(
router['id'], router.get(l3_constants.HA_ROUTER_STATE_KEY))
ri.router = router
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
self, router=ri)
ri.process()
registry.notify(
resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
self.l3_ext_manager.update_router(self.context, router)
def _resync_router(self, router_update,
priority=queue.PRIORITY_SYNC_ROUTERS_TASK):
# Don't keep trying to resync if it's failing
if router_update.hit_retry_limit():
LOG.warning("Hit retry limit with router update for %s, action %s",
router_update.id, router_update.action)
if router_update.action != queue.DELETE_ROUTER:
LOG.debug("Deleting router %s", router_update.id)
self._safe_router_removed(router_update.id)
return
router_update.timestamp = timeutils.utcnow()
router_update.priority = priority
router_update.resource = None # Force the agent to resync the router
self._queue.add(router_update)
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_resource():
LOG.debug("Starting router update for %s, action %s, priority %s",
update.id, update.action, update.priority)
if update.action == queue.PD_UPDATE:
self.pd.process_prefix_update()
LOG.debug("Finished a router update for %s", update.id)
continue
routers = [update.resource] if update.resource else []
not_delete_no_routers = (update.action != queue.DELETE_ROUTER and
not routers)
related_action = update.action in (queue.DELETE_RELATED_ROUTER,
queue.ADD_UPDATE_RELATED_ROUTER)
if not_delete_no_routers or related_action:
try:
update.timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(self.context,
[update.id])
except Exception:
msg = "Failed to fetch router information for '%s'"
LOG.exception(msg, update.id)
self._resync_router(update)
continue
# For a related action, verify the router is still hosted here,
# since it could have just been deleted and we don't want to
# add it back.
if related_action:
routers = [r for r in routers if r['id'] == update.id]
if not routers:
removed = self._safe_router_removed(update.id)
if not removed:
self._resync_router(update)
else:
# need to update timestamp of removed router in case
# there are older events for the same router in the
# processing queue (like events from fullsync) in order to
# prevent deleted router re-creation
rp.fetched_and_processed(update.timestamp)
LOG.debug("Finished a router update for %s", update.id)
continue
if not self._process_routers_if_compatible(routers, update):
self._resync_router(update)
continue
rp.fetched_and_processed(update.timestamp)
LOG.debug("Finished a router update for %s", update.id)
def _process_routers_if_compatible(self, routers, update):
process_result = True
for router in routers:
if router['id'] != update.id:
# Don't do the work here, instead create a new update and
# enqueue it, since there could be another thread working
# on it already and we don't want to race.
new_action = queue.RELATED_ACTION_MAP.get(
update.action, queue.ADD_UPDATE_RELATED_ROUTER)
new_update = queue.ResourceUpdate(
router['id'],
priority=queue.PRIORITY_RELATED_ROUTER,
action=new_action)
self._queue.add(new_update)
LOG.debug('Queued a router update for %(router_id)s '
'(related router %(related_router_id)s). '
'Original event action %(action)s, '
'priority %(priority)s. '
'New event action %(new_action)s, '
'priority %(new_priority)s',
{'router_id': router['id'],
'related_router_id': update.id,
'action': update.action,
'priority': update.priority,
'new_action': new_update.action,
'new_priority': new_update.priority})
continue
try:
self._process_router_if_compatible(router)
except n_exc.RouterNotCompatibleWithAgent as e:
log_verbose_exc(e.msg, router)
# Was the router previously handled by this agent?
if router['id'] in self.router_info:
LOG.error("Removing incompatible router '%s'",
router['id'])
self._safe_router_removed(router['id'])
except Exception:
log_verbose_exc(
"Failed to process compatible router: %s" % update.id,
router)
process_result = False
return process_result
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
while True:
self._pool.spawn_n(self._process_router_update)
# NOTE(kevinbenton): this is set to 1 second because the actual interval
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
# is responsible for task execution.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def periodic_sync_routers_task(self, context):
if not self.fullsync:
return
LOG.debug("Starting fullsync periodic_sync_routers_task")
# self.fullsync is True at this point. If an exception -- caught or
# uncaught -- prevents setting it to False below then the next call
# to periodic_sync_routers_task will re-enter this code and try again.
# Context manager self.namespaces_manager captures a picture of
# namespaces *before* fetch_and_sync_all_routers fetches the full list
# of routers from the database. This is important to correctly
# identify stale ones.
try:
with self.namespaces_manager as ns_manager:
self.fetch_and_sync_all_routers(context, ns_manager)
except n_exc.AbortSyncRouters:
self.fullsync = True
def fetch_and_sync_all_routers(self, context, ns_manager):
prev_router_ids = set(self.router_info)
curr_router_ids = set()
timestamp = timeutils.utcnow()
router_ids = []
chunk = []
is_snat_agent = (self.conf.agent_mode ==
lib_const.L3_AGENT_MODE_DVR_SNAT)
try:
router_ids = self.plugin_rpc.get_router_ids(context)
# fetch routers by chunks to reduce the load on server and to
# start router processing earlier
for i in range(0, len(router_ids), self.sync_routers_chunk_size):
chunk = router_ids[i:i + self.sync_routers_chunk_size]
routers = self.plugin_rpc.get_routers(context, chunk)
LOG.debug('Processing :%r', routers)
for r in routers:
curr_router_ids.add(r['id'])
ns_manager.keep_router(r['id'])
if r.get('distributed'):
# need to keep fip namespaces as well
ext_net_id = (r['external_gateway_info'] or {}).get(
'network_id')
if ext_net_id:
ns_manager.keep_ext_net(ext_net_id)
elif is_snat_agent and not r.get('ha'):
ns_manager.ensure_snat_cleanup(r['id'])
update = queue.ResourceUpdate(
r['id'],
queue.PRIORITY_SYNC_ROUTERS_TASK,
resource=r,
action=queue.ADD_UPDATE_ROUTER,
timestamp=timestamp)
self._queue.add(update)
except oslo_messaging.MessagingTimeout:
if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
self.sync_routers_chunk_size = max(
self.sync_routers_chunk_size / 2,
SYNC_ROUTERS_MIN_CHUNK_SIZE)
LOG.error('Server failed to return info for routers in '
'required time, decreasing chunk size to: %s',
self.sync_routers_chunk_size)
else:
LOG.error('Server failed to return info for routers in '
'required time even with min chunk size: %s. '
'It might be under very high load or '
'just inoperable',
self.sync_routers_chunk_size)
raise
except oslo_messaging.MessagingException:
failed_routers = chunk or router_ids
LOG.exception("Failed synchronizing routers '%s' "
"due to RPC error", failed_routers)
raise n_exc.AbortSyncRouters()
self.fullsync = False
LOG.debug("periodic_sync_routers_task successfully completed")
# adjust chunk size after successful sync
if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
self.sync_routers_chunk_size = min(
self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
SYNC_ROUTERS_MAX_CHUNK_SIZE)
# Delete routers that have disappeared since the last sync
for router_id in prev_router_ids - curr_router_ids:
ns_manager.keep_router(router_id)
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=queue.DELETE_ROUTER)
self._queue.add(update)
@property
def context(self):
# generate a new request-id on each call to make server side tracking
# of RPC calls easier.
self._context.request_id = common_context.generate_request_id()
return self._context
def after_start(self):
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
# calls this method here. So Removing this after_start() would break
# vArmourL3NATAgent. We need to find out whether vArmourL3NATAgent
# can have L3NATAgentWithStateReport as its base class instead of
# L3NATAgent.
eventlet.spawn_n(self._process_routers_loop)
LOG.info("L3 agent started")
def create_pd_router_update(self):
router_id = None
update = queue.ResourceUpdate(router_id,
queue.PRIORITY_PD_UPDATE,
timestamp=timeutils.utcnow(),
action=queue.PD_UPDATE)
self._queue.add(update)
class L3NATAgentWithStateReport(L3NATAgent):
def __init__(self, host, conf=None):
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.agent_state = {
'binary': 'neutron-l3-agent',
'host': host,
'availability_zone': self.conf.AGENT.availability_zone,
'topic': topics.L3_AGENT,
'configurations': {
'agent_mode': self.conf.agent_mode,
'handle_internal_only_routers':
self.conf.handle_internal_only_routers,
'external_network_bridge': self.conf.external_network_bridge,
'gateway_external_network_id':
self.conf.gateway_external_network_id,
'interface_driver': self.conf.interface_driver,
'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats,
'extensions': self.l3_ext_manager.names()},
'start_flag': True,
'agent_type': lib_const.AGENT_TYPE_L3}
report_interval = self.conf.AGENT.report_interval
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)
def _report_state(self):
num_ex_gw_ports = 0
num_interfaces = 0
num_floating_ips = 0
router_infos = self.router_info.values()
num_routers = len(router_infos)
for ri in router_infos:
ex_gw_port = ri.get_ex_gw_port()
if ex_gw_port:
num_ex_gw_ports += 1
num_interfaces += len(ri.router.get(lib_const.INTERFACE_KEY,
[]))
num_floating_ips += len(ri.router.get(lib_const.FLOATINGIP_KEY,
[]))
configurations = self.agent_state['configurations']
configurations['routers'] = num_routers
configurations['ex_gw_ports'] = num_ex_gw_ports
configurations['interfaces'] = num_interfaces
configurations['floating_ips'] = num_floating_ips
try:
agent_status = self.state_rpc.report_state(self.context,
self.agent_state,
True)
if agent_status == agent_consts.AGENT_REVIVED:
LOG.info('Agent has just been revived. '
'Doing a full sync.')
self.fullsync = True
self.agent_state.pop('start_flag', None)
except AttributeError:
# This means the server does not support report_state
LOG.warning("Neutron server does not support state report. "
"State report for this agent will be disabled.")
self.heartbeat.stop()
return
except Exception:
LOG.exception("Failed reporting state!")
def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
LOG.info("L3 agent started")
# Do the report state before we do the first full sync.
self._report_state()
self.pd.after_start()
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.fullsync = True
LOG.info("agent_updated by server side %s!", payload)