remove unnecessary neutron files under neutron/agent

Change-Id: Id9657dd4bbb8da07988a22fd8531b49352cf2253
changes/09/104809/1
Isaku Yamahata 9 years ago
parent 34b7393388
commit d0fd540c3c

@ -1,622 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
import eventlet
eventlet.monkey_patch()
import netaddr
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ovs_lib # noqa
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import service
from neutron import service as neutron_service
LOG = logging.getLogger(__name__)
class DhcpAgent(manager.Manager):
OPTS = [
cfg.IntOpt('resync_interval', default=5,
help=_("Interval to resync.")),
cfg.StrOpt('dhcp_driver',
default='neutron.agent.linux.dhcp.Dnsmasq',
help=_("The driver used to manage the DHCP server.")),
cfg.BoolOpt('enable_isolated_metadata', default=False,
help=_("Support Metadata requests on isolated networks.")),
cfg.BoolOpt('enable_metadata_network', default=False,
help=_("Allows for serving metadata requests from a "
"dedicated network. Requires "
"enable_isolated_metadata = True")),
cfg.IntOpt('num_sync_threads', default=4,
help=_('Number of threads to use during sync process.')),
cfg.StrOpt('metadata_proxy_socket',
default='$state_path/metadata_proxy',
help=_('Location of Metadata Proxy UNIX domain '
'socket')),
]
def __init__(self, host=None):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync_reasons = []
self.conf = cfg.CONF
self.cache = NetworkCache()
self.root_helper = config.get_root_helper(self.conf)
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN,
ctx, self.conf.use_namespaces)
# create dhcp dir to store dhcp info
dhcp_dir = os.path.dirname("/%s/dhcp/" % self.conf.state_path)
if not os.path.isdir(dhcp_dir):
os.makedirs(dhcp_dir, 0o755)
self.dhcp_version = self.dhcp_driver_cls.check_version()
self._populate_networks_cache()
def _populate_networks_cache(self):
"""Populate the networks cache when the DHCP-agent starts."""
try:
existing_networks = self.dhcp_driver_cls.existing_dhcp_networks(
self.conf,
self.root_helper
)
for net_id in existing_networks:
net = dhcp.NetModel(self.conf.use_namespaces,
{"id": net_id,
"subnets": [],
"ports": []})
self.cache.put(net)
except NotImplementedError:
# just go ahead with an empty networks cache
LOG.debug(
_("The '%s' DHCP-driver does not support retrieving of a "
"list of existing networks"),
self.conf.dhcp_driver
)
def after_start(self):
self.run()
LOG.info(_("DHCP agent started"))
def run(self):
"""Activate the DHCP agent."""
self.sync_state()
self.periodic_resync()
def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
LOG.debug(_('Calling driver for network: %(net)s action: %(action)s'),
{'net': network.id, 'action': action})
try:
# the Driver expects something that is duck typed similar to
# the base models.
driver = self.dhcp_driver_cls(self.conf,
network,
self.root_helper,
self.dhcp_version,
self.plugin_rpc)
getattr(driver, action)(**action_kwargs)
return True
except exceptions.Conflict:
# No need to resync here, the agent will receive the event related
# to a status update for the network
LOG.warning(_('Unable to %(action)s dhcp for %(net_id)s: there is '
'a conflict with its current state; please check '
'that the network and/or its subnet(s) still exist.')
% {'net_id': network.id, 'action': action})
except Exception as e:
self.schedule_resync(e)
if (isinstance(e, rpc_compat.RemoteError)
and e.exc_type == 'NetworkNotFound'
or isinstance(e, exceptions.NetworkNotFound)):
LOG.warning(_("Network %s has been deleted."), network.id)
else:
LOG.exception(_('Unable to %(action)s dhcp for %(net_id)s.')
% {'net_id': network.id, 'action': action})
def schedule_resync(self, reason):
"""Schedule a resync for a given reason."""
self.needs_resync_reasons.append(reason)
@utils.synchronized('dhcp-agent')
def sync_state(self):
"""Sync the local DHCP state with Neutron."""
LOG.info(_('Synchronizing state'))
pool = eventlet.GreenPool(cfg.CONF.num_sync_threads)
known_network_ids = set(self.cache.get_network_ids())
try:
active_networks = self.plugin_rpc.get_active_networks_info()
active_network_ids = set(network.id for network in active_networks)
for deleted_id in known_network_ids - active_network_ids:
try:
self.disable_dhcp_helper(deleted_id)
except Exception as e:
self.schedule_resync(e)
LOG.exception(_('Unable to sync network state on deleted '
'network %s'), deleted_id)
for network in active_networks:
pool.spawn(self.safe_configure_dhcp_for_network, network)
pool.waitall()
LOG.info(_('Synchronizing state complete'))
except Exception as e:
self.schedule_resync(e)
LOG.exception(_('Unable to sync network state.'))
def _periodic_resync_helper(self):
"""Resync the dhcp state at the configured interval."""
while True:
eventlet.sleep(self.conf.resync_interval)
if self.needs_resync_reasons:
# be careful to avoid a race with additions to list
# from other threads
reasons = self.needs_resync_reasons
self.needs_resync_reasons = []
for r in reasons:
LOG.debug(_("resync: %(reason)s"),
{"reason": r})
self.sync_state()
def periodic_resync(self):
"""Spawn a thread to periodically resync the dhcp state."""
eventlet.spawn(self._periodic_resync_helper)
def safe_get_network_info(self, network_id):
try:
network = self.plugin_rpc.get_network_info(network_id)
if not network:
LOG.warn(_('Network %s has been deleted.'), network_id)
return network
except Exception as e:
self.schedule_resync(e)
LOG.exception(_('Network %s info call failed.'), network_id)
def enable_dhcp_helper(self, network_id):
"""Enable DHCP for a network that meets enabling criteria."""
network = self.safe_get_network_info(network_id)
if network:
self.configure_dhcp_for_network(network)
def safe_configure_dhcp_for_network(self, network):
try:
self.configure_dhcp_for_network(network)
except (exceptions.NetworkNotFound, RuntimeError):
LOG.warn(_('Network %s may have been deleted and its resources '
'may have already been disposed.'), network.id)
def configure_dhcp_for_network(self, network):
if not network.admin_state_up:
return
for subnet in network.subnets:
if subnet.enable_dhcp:
if self.call_driver('enable', network):
if (self.conf.use_namespaces and
self.conf.enable_isolated_metadata):
self.enable_isolated_metadata_proxy(network)
self.cache.put(network)
break
def disable_dhcp_helper(self, network_id):
"""Disable DHCP for a network known to the agent."""
network = self.cache.get_network_by_id(network_id)
if network:
if (self.conf.use_namespaces and
self.conf.enable_isolated_metadata):
self.disable_isolated_metadata_proxy(network)
if self.call_driver('disable', network):
self.cache.remove(network)
def refresh_dhcp_helper(self, network_id):
"""Refresh or disable DHCP for a network depending on the current state
of the network.
"""
old_network = self.cache.get_network_by_id(network_id)
if not old_network:
# DHCP current not running for network.
return self.enable_dhcp_helper(network_id)
network = self.safe_get_network_info(network_id)
if not network:
return
old_cidrs = set(s.cidr for s in old_network.subnets if s.enable_dhcp)
new_cidrs = set(s.cidr for s in network.subnets if s.enable_dhcp)
if new_cidrs and old_cidrs == new_cidrs:
self.call_driver('reload_allocations', network)
self.cache.put(network)
elif new_cidrs:
if self.call_driver('restart', network):
self.cache.put(network)
else:
self.disable_dhcp_helper(network.id)
@utils.synchronized('dhcp-agent')
def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent')
def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent')
def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
self.disable_dhcp_helper(payload['network_id'])
@utils.synchronized('dhcp-agent')
def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
@utils.synchronized('dhcp-agent')
def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if network:
self.refresh_dhcp_helper(network.id)
@utils.synchronized('dhcp-agent')
def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port'])
network = self.cache.get_network_by_id(updated_port.network_id)
if network:
self.cache.put_port(updated_port)
self.call_driver('reload_allocations', network)
# Use the update handler for the port create event.
port_create_end = port_update_end
@utils.synchronized('dhcp-agent')
def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id'])
if port:
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
self.call_driver('reload_allocations', network)
def enable_isolated_metadata_proxy(self, network):
# The proxy might work for either a single network
# or all the networks connected via a router
# to the one passed as a parameter
neutron_lookup_param = '--network_id=%s' % network.id
meta_cidr = netaddr.IPNetwork(dhcp.METADATA_DEFAULT_CIDR)
has_metadata_subnet = any(netaddr.IPNetwork(s.cidr) in meta_cidr
for s in network.subnets)
if (self.conf.enable_metadata_network and has_metadata_subnet):
router_ports = [port for port in network.ports
if (port.device_owner ==
constants.DEVICE_OWNER_ROUTER_INTF)]
if router_ports:
# Multiple router ports should not be allowed
if len(router_ports) > 1:
LOG.warning(_("%(port_num)d router ports found on the "
"metadata access network. Only the port "
"%(port_id)s, for router %(router_id)s "
"will be considered"),
{'port_num': len(router_ports),
'port_id': router_ports[0].id,
'router_id': router_ports[0].device_id})
neutron_lookup_param = ('--router_id=%s' %
router_ports[0].device_id)
def callback(pid_file):
metadata_proxy_socket = cfg.CONF.metadata_proxy_socket
proxy_cmd = ['neutron-ns-metadata-proxy',
'--pid_file=%s' % pid_file,
'--metadata_proxy_socket=%s' % metadata_proxy_socket,
neutron_lookup_param,
'--state_path=%s' % self.conf.state_path,
'--metadata_port=%d' % dhcp.METADATA_PORT]
proxy_cmd.extend(config.get_log_args(
cfg.CONF, 'neutron-ns-metadata-proxy-%s.log' % network.id))
return proxy_cmd
pm = external_process.ProcessManager(
self.conf,
network.id,
self.root_helper,
network.namespace)
pm.enable(callback)
def disable_isolated_metadata_proxy(self, network):
pm = external_process.ProcessManager(
self.conf,
network.id,
self.root_helper,
network.namespace)
pm.disable()
class DhcpPluginApi(rpc_compat.RpcProxy):
"""Agent side of the dhcp rpc API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
and update_dhcp_port methods.
"""
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic, context, use_namespaces):
super(DhcpPluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context
self.host = cfg.CONF.host
self.use_namespaces = use_namespaces
def get_active_networks_info(self):
"""Make a remote process call to retrieve all network info."""
networks = self.call(self.context,
self.make_msg('get_active_networks_info',
host=self.host),
topic=self.topic)
return [dhcp.NetModel(self.use_namespaces, n) for n in networks]
def get_network_info(self, network_id):
"""Make a remote process call to retrieve network info."""
network = self.call(self.context,
self.make_msg('get_network_info',
network_id=network_id,
host=self.host),
topic=self.topic)
if network:
return dhcp.NetModel(self.use_namespaces, network)
def get_dhcp_port(self, network_id, device_id):
"""Make a remote process call to get the dhcp port."""
port = self.call(self.context,
self.make_msg('get_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host),
topic=self.topic)
if port:
return dhcp.DictModel(port)
def create_dhcp_port(self, port):
"""Make a remote process call to create the dhcp port."""
port = self.call(self.context,
self.make_msg('create_dhcp_port',
port=port,
host=self.host),
topic=self.topic)
if port:
return dhcp.DictModel(port)
def update_dhcp_port(self, port_id, port):
"""Make a remote process call to update the dhcp port."""
port = self.call(self.context,
self.make_msg('update_dhcp_port',
port_id=port_id,
port=port,
host=self.host),
topic=self.topic)
if port:
return dhcp.DictModel(port)
def release_dhcp_port(self, network_id, device_id):
"""Make a remote process call to release the dhcp port."""
return self.call(self.context,
self.make_msg('release_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host),
topic=self.topic)
def release_port_fixed_ip(self, network_id, device_id, subnet_id):
"""Make a remote process call to release a fixed_ip on the port."""
return self.call(self.context,
self.make_msg('release_port_fixed_ip',
network_id=network_id,
subnet_id=subnet_id,
device_id=device_id,
host=self.host),
topic=self.topic)
class NetworkCache(object):
"""Agent cache of the current network state."""
def __init__(self):
self.cache = {}
self.subnet_lookup = {}
self.port_lookup = {}
def get_network_ids(self):
return self.cache.keys()
def get_network_by_id(self, network_id):
return self.cache.get(network_id)
def get_network_by_subnet_id(self, subnet_id):
return self.cache.get(self.subnet_lookup.get(subnet_id))
def get_network_by_port_id(self, port_id):
return self.cache.get(self.port_lookup.get(port_id))
def put(self, network):
if network.id in self.cache:
self.remove(self.cache[network.id])
self.cache[network.id] = network
for subnet in network.subnets:
self.subnet_lookup[subnet.id] = network.id
for port in network.ports:
self.port_lookup[port.id] = network.id
def remove(self, network):
del self.cache[network.id]
for subnet in network.subnets:
del self.subnet_lookup[subnet.id]
for port in network.ports:
del self.port_lookup[port.id]
def put_port(self, port):
network = self.get_network_by_id(port.network_id)
for index in range(len(network.ports)):
if network.ports[index].id == port.id:
network.ports[index] = port
break
else:
network.ports.append(port)
self.port_lookup[port.id] = network.id
def remove_port(self, port):
network = self.get_network_by_port_id(port.id)
for index in range(len(network.ports)):
if network.ports[index] == port:
del network.ports[index]
del self.port_lookup[port.id]
break
def get_port_by_id(self, port_id):
network = self.get_network_by_port_id(port_id)
if network:
for port in network.ports:
if port.id == port_id:
return port
def get_state(self):
net_ids = self.get_network_ids()
num_nets = len(net_ids)
num_subnets = 0
num_ports = 0
for net_id in net_ids:
network = self.get_network_by_id(net_id)
num_subnets += len(network.subnets)
num_ports += len(network.ports)
return {'networks': num_nets,
'subnets': num_subnets,
'ports': num_ports}
class DhcpAgentWithStateReport(DhcpAgent):
def __init__(self, host=None):
super(DhcpAgentWithStateReport, self).__init__(host=host)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.agent_state = {
'binary': 'neutron-dhcp-agent',
'host': host,
'topic': topics.DHCP_AGENT,
'configurations': {
'dhcp_driver': cfg.CONF.dhcp_driver,
'use_namespaces': cfg.CONF.use_namespaces,
'dhcp_lease_duration': cfg.CONF.dhcp_lease_duration},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_DHCP}
report_interval = cfg.CONF.AGENT.report_interval
self.use_call = True
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)
def _report_state(self):
try:
self.agent_state.get('configurations').update(
self.cache.get_state())
ctx = context.get_admin_context_without_session()
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
self.use_call = False
except AttributeError:
# This means the server does not support report_state
LOG.warn(_("Neutron server does not support state report."
" State report for this agent will be disabled."))
self.heartbeat.stop()
self.run()
return
except Exception:
LOG.exception(_("Failed reporting state!"))
return
if self.agent_state.pop('start_flag', None):
self.run()
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.schedule_resync(_("Agent updated: %(payload)s") %
{"payload": payload})
LOG.info(_("agent_updated by server side %s!"), payload)
def after_start(self):
LOG.info(_("DHCP agent started"))
def register_options():
cfg.CONF.register_opts(DhcpAgent.OPTS)
config.register_interface_driver_opts_helper(cfg.CONF)
config.register_use_namespaces_opts_helper(cfg.CONF)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(dhcp.OPTS)
cfg.CONF.register_opts(interface.OPTS)
def main():
register_options()
common_config.init(sys.argv[1:])
config.setup_logging(cfg.CONF)
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp_agent.DhcpAgentWithStateReport')
service.launch(server).wait()

@ -1,138 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
import six
@six.add_metaclass(abc.ABCMeta)
class FirewallDriver(object):
"""Firewall Driver base class.
Defines methods that any driver providing security groups
and provider firewall functionality should implement.
Note port attribute should have information of security group ids and
security group rules.
the dict of port should have
device : interface name
fixed_ips: ips of the device
mac_address: mac_address of the device
security_groups: [sgid, sgid]
security_group_rules : [ rule, rule ]
the rule must contain ethertype and direction
the rule may contain security_group_id,
protocol, port_min, port_max
source_ip_prefix, source_port_min,
source_port_max, dest_ip_prefix, and
remote_group_id
Note: source_group_ip in REST API should be converted by this rule
if direction is ingress:
remote_group_ip will be a source_ip_prefix
if direction is egress:
remote_group_ip will be a dest_ip_prefix
Note: remote_group_id in REST API should be converted by this rule
if direction is ingress:
remote_group_id will be a list of source_ip_prefix
if direction is egress:
remote_group_id will be a list of dest_ip_prefix
remote_group_id will also remaining membership update management
"""
def prepare_port_filter(self, port):
"""Prepare filters for the port.
This method should be called before the port is created.
"""
raise NotImplementedError()
def apply_port_filter(self, port):
"""Apply port filter.
Once this method returns, the port should be firewalled
appropriately. This method should as far as possible be a
no-op. It's vastly preferred to get everything set up in
prepare_port_filter.
"""
raise NotImplementedError()
def update_port_filter(self, port):
"""Refresh security group rules from data store
Gets called when an port gets added to or removed from
the security group the port is a member of or if the
group gains or looses a rule.
"""
raise NotImplementedError()
def remove_port_filter(self, port):
"""Stop filtering port."""
raise NotImplementedError()
def filter_defer_apply_on(self):
"""Defer application of filtering rule."""
pass
def filter_defer_apply_off(self):
"""Turn off deferral of rules and apply the rules now."""
pass
@property
def ports(self):
"""Returns filtered ports."""
pass
@contextlib.contextmanager
def defer_apply(self):
"""Defer apply context."""
self.filter_defer_apply_on()
try:
yield
finally:
self.filter_defer_apply_off()
class NoopFirewallDriver(FirewallDriver):
"""Noop Firewall Driver.
Firewall driver which does nothing.
This driver is for disabling the firewall functionality.
"""
def prepare_port_filter(self, port):
pass
def apply_port_filter(self, port):
pass
def update_port_filter(self, port):
pass
def remove_port_filter(self, port):
pass
def filter_defer_apply_on(self):
pass
def filter_defer_apply_off(self):
pass
@property
def ports(self):
return {}

@ -1,56 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Sylvain Afchain, eNovance SAS
# @author: Francois Eleouet, Orange
# @author: Mathieu Rohon, Orange
import abc
from oslo.config import cfg
import six
from neutron.common import log
@six.add_metaclass(abc.ABCMeta)
class L2populationRpcCallBackMixin(object):
@log.log
def add_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_add(context, fdb_entries)
@log.log
def remove_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_remove(context, fdb_entries)
@log.log
def update_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_update(context, fdb_entries)
@abc.abstractmethod
def fdb_add(self, context, fdb_entries):
pass
@abc.abstractmethod
def fdb_remove(self, context, fdb_entries):
pass
@abc.abstractmethod
def fdb_update(self, context, fdb_entries):
pass

@ -1,990 +0,0 @@
# Copyright 2012 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import sys
import eventlet
eventlet.monkey_patch()
import netaddr
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ovs_lib # noqa
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils as common_utils
from neutron import context
from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.openstack.common import processutils
from neutron.openstack.common import service
from neutron import service as neutron_service
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qrouter-'
INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
RPC_LOOP_INTERVAL = 1
FLOATING_IP_CIDR_SUFFIX = '/32'
class L3PluginApi(rpc_compat.RpcProxy):
"""Agent side of the l3 agent RPC API.
API version history:
1.0 - Initial version.
1.1 - Floating IP operational status updates
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic, host):
super(L3PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.host = host
def get_routers(self, context, router_ids=None):
"""Make a remote process call to retrieve the sync data for routers."""
return self.call(context,
self.make_msg('sync_routers', host=self.host,
router_ids=router_ids),
topic=self.topic)
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
@raise rpc_compat.RemoteError: with TooManyExternalNetworks
as exc_type if there are
more than one external network
"""
return self.call(context,
self.make_msg('get_external_network_id',
host=self.host),
topic=self.topic)
def update_floatingip_statuses(self, context, router_id, fip_statuses):
"""Call the plugin update floating IPs's operational status."""
return self.call(context,
self.make_msg('update_floatingip_statuses',
router_id=router_id,
fip_statuses=fip_statuses),
topic=self.topic,
version='1.1')
class RouterInfo(object):
def __init__(self, router_id, root_helper, use_namespaces, router):
self.router_id = router_id
self.ex_gw_port = None
self._snat_enabled = None
self._snat_action = None
self.internal_ports = []
self.floating_ips = set()
self.root_helper = root_helper
self.use_namespaces = use_namespaces
# Invoke the setter for establishing initial SNAT action
self.router = router
self.ns_name = NS_PREFIX + router_id if use_namespaces else None
self.iptables_manager = iptables_manager.IptablesManager(
root_helper=root_helper,
#FIXME(danwent): use_ipv6=True,
namespace=self.ns_name)
self.routes = []
@property
def router(self):
return self._router
@router.setter
def router(self, value):
self._router = value
if not self._router:
return
# enable_snat by default if it wasn't specified by plugin
self._snat_enabled = self._router.get('enable_snat', True)
# Set a SNAT action for the router
if self._router.get('gw_port'):
self._snat_action = ('add_rules' if self._snat_enabled
else 'remove_rules')
elif self.ex_gw_port:
# Gateway port was removed, remove rules
self._snat_action = 'remove_rules'
def perform_snat_action(self, snat_callback, *args):
# Process SNAT rules for attached subnets
if self._snat_action:
snat_callback(self, self._router.get('gw_port'),
*args, action=self._snat_action)
self._snat_action = None
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
"""Manager for L3NatAgent
API version history:
1.0 initial Version
1.1 changed the type of the routers parameter
to the routers_updated method.
It was previously a list of routers in dict format.
It is now a list of router IDs only.
Per rpc versioning rules, it is backwards compatible.
"""
RPC_API_VERSION = '1.1'
OPTS = [
cfg.StrOpt('external_network_bridge', default='br-ex',
help=_("Name of bridge used for external network "
"traffic.")),
cfg.IntOpt('metadata_port',
default=9697,
help=_("TCP Port used by Neutron metadata namespace "
"proxy.")),
cfg.IntOpt('send_arp_for_ha',
default=3,
help=_("Send this many gratuitous ARPs for HA setup, if "
"less than or equal to 0, the feature is disabled")),
cfg.StrOpt('router_id', default='',
help=_("If namespaces is disabled, the l3 agent can only"
" configure a router that has the matching router "
"ID.")),
cfg.BoolOpt('handle_internal_only_routers',
default=True,
help=_("Agent should implement routers with no gateway")),
cfg.StrOpt('gateway_external_network_id', default='',
help=_("UUID of external network for routers implemented "
"by the agents.")),
cfg.BoolOpt('enable_metadata_proxy', default=True,
help=_("Allow running metadata proxy.")),
cfg.BoolOpt('router_delete_namespaces', default=False,
help=_("Delete namespace after removing a router.")),
cfg.StrOpt('metadata_proxy_socket',
default='$state_path/metadata_proxy',
help=_('Location of Metadata Proxy UNIX domain '
'socket')),
]
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.root_helper = config.get_root_helper(self.conf)
self.router_info = {}
self._check_config_params()
try:
self.driver = importutils.import_object(
self.conf.interface_driver,
self.conf
)
except Exception:
msg = _("Error importing interface driver "
"'%s'") % self.conf.interface_driver
LOG.error(msg)
raise SystemExit(1)
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.updated_routers = set()
self.removed_routers = set()
self.sync_progress = False
self._clean_stale_namespaces = self.conf.use_namespaces
self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
self._rpc_loop)
self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
def _check_config_params(self):
"""Check items in configuration files.
Check for required and invalid configuration items.
The actual values are not verified for correctness.
"""
if not self.conf.interface_driver:
msg = _('An interface driver must be specified')
LOG.error(msg)
raise SystemExit(1)
if not self.conf.use_namespaces and not self.conf.router_id:
msg = _('Router id is required if not using namespaces.')
LOG.error(msg)
raise SystemExit(1)
def _cleanup_namespaces(self, routers):
"""Destroy stale router namespaces on host when L3 agent restarts
This routine is called when self._clean_stale_namespaces is True.
The argument routers is the list of routers that are recorded in
the database as being hosted on this node.
"""
try:
root_ip = ip_lib.IPWrapper(self.root_helper)
host_namespaces = root_ip.get_namespaces(self.root_helper)
router_namespaces = set(ns for ns in host_namespaces
if ns.startswith(NS_PREFIX))
ns_to_ignore = set(NS_PREFIX + r['id'] for r in routers)
ns_to_destroy = router_namespaces - ns_to_ignore
except RuntimeError:
LOG.exception(_('RuntimeError in obtaining router list '
'for namespace cleanup.'))
else:
self._destroy_stale_router_namespaces(ns_to_destroy)
def _destroy_stale_router_namespaces(self, router_namespaces):
"""Destroys the stale router namespaces
The argumenet router_namespaces is a list of stale router namespaces
As some stale router namespaces may not be able to be deleted, only
one attempt will be made to delete them.
"""
for ns in router_namespaces:
if self.conf.enable_metadata_proxy:
self._destroy_metadata_proxy(ns[len(NS_PREFIX):], ns)
try:
self._destroy_router_namespace(ns)
except RuntimeError:
LOG.exception(_('Failed to destroy stale router namespace '
'%s'), ns)
self._clean_stale_namespaces = False
def _destroy_router_namespace(self, namespace):
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=namespace)
for d in ns_ip.get_devices(exclude_loopback=True):
if d.name.startswith(INTERNAL_DEV_PREFIX):
# device is on default bridge
self.driver.unplug(d.name, namespace=namespace,
prefix=INTERNAL_DEV_PREFIX)
elif d.name.startswith(EXTERNAL_DEV_PREFIX):
self.driver.unplug(d.name,
bridge=self.conf.external_network_bridge,
namespace=namespace,
prefix=EXTERNAL_DEV_PREFIX)
if self.conf.router_delete_namespaces:
try:
ns_ip.netns.delete(namespace)
except RuntimeError:
msg = _('Failed trying to delete namespace: %s')
LOG.exception(msg % namespace)
def _create_router_namespace(self, ri):
ip_wrapper_root = ip_lib.IPWrapper(self.root_helper)
ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name)
ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
def _fetch_external_net_id(self, force=False):
"""Find UUID of single external network for this agent."""
if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id
# L3 agent doesn't use external_network_bridge to handle external
# networks, so bridge_mappings with provider networks will be used
# and the L3 agent is able to handle any external networks.
if not self.conf.external_network_bridge:
return
if not force and self.target_ex_net_id:
return self.target_ex_net_id
try:
self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
self.context)
return self.target_ex_net_id
except rpc_compat.RemoteError as e:
with excutils.save_and_reraise_exception() as ctx:
if e.exc_type == 'TooManyExternalNetworks':
ctx.reraise = False
msg = _(
"The 'gateway_external_network_id' option must be "
"configured for this agent as Neutron has more than "
"one external network.")
raise Exception(msg)
def _router_added(self, router_id, router):
ri = RouterInfo(router_id, self.root_helper,
self.conf.use_namespaces, router)
self.router_info[router_id] = ri
if self.conf.use_namespaces:
self._create_router_namespace(ri)
for c, r in self.metadata_filter_rules():
ri.iptables_manager.ipv4['filter'].add_rule(c, r)
for c, r in self.metadata_nat_rules():
ri.iptables_manager.ipv4['nat'].add_rule(c, r)
ri.iptables_manager.apply()
super(L3NATAgent, self).process_router_add(ri)
if self.conf.enable_metadata_proxy:
self._spawn_metadata_proxy(ri.router_id, ri.ns_name)
def _router_removed(self, router_id):
ri = self.router_info.get(router_id)
if ri is None:
LOG.warn(_("Info for router %s were not found. "
"Skipping router removal"), router_id)
return
ri.router['gw_port'] = None
ri.router[l3_constants.INTERFACE_KEY] = []
ri.router[l3_constants.FLOATINGIP_KEY] = []
self.process_router(ri)
for c, r in self.metadata_filter_rules():
ri.iptables_manager.ipv4['filter'].remove_rule(c, r)
for c, r in self.metadata_nat_rules():
ri.iptables_manager.ipv4['nat'].remove_rule(c, r)
ri.iptables_manager.apply()
if self.conf.enable_metadata_proxy:
self._destroy_metadata_proxy(ri.router_id, ri.ns_name)
del self.router_info[router_id]
self._destroy_router_namespace(ri.ns_name)
def _spawn_metadata_proxy(self, router_id, ns_name):
def callback(pid_file):
metadata_proxy_socket = cfg.CONF.metadata_proxy_socket
proxy_cmd = ['neutron-ns-metadata-proxy',
'--pid_file=%s' % pid_file,
'--metadata_proxy_socket=%s' % metadata_proxy_socket,
'--router_id=%s' % router_id,
'--state_path=%s' % self.conf.state_path,
'--metadata_port=%s' % self.conf.metadata_port]
proxy_cmd.extend(config.get_log_args(
cfg.CONF, 'neutron-ns-metadata-proxy-%s.log' %
router_id))
return proxy_cmd
pm = external_process.ProcessManager(
self.conf,
router_id,
self.root_helper,
ns_name)
pm.enable(callback)
def _destroy_metadata_proxy(self, router_id, ns_name):
pm = external_process.ProcessManager(
self.conf,
router_id,
self.root_helper,
ns_name)
pm.disable()
def _set_subnet_info(self, port):
ips = port['fixed_ips']
if not ips:
raise Exception(_("Router port %s has no IP address") % port['id'])
if len(ips) > 1:
LOG.error(_("Ignoring multiple IPs on router port %s"),
port['id'])
prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
def _get_existing_devices(self, ri):
ip_wrapper = ip_lib.IPWrapper(root_helper=self.root_helper,
namespace=ri.ns_name)
ip_devs = ip_wrapper.get_devices(exclude_loopback=True)
return [ip_dev.name for ip_dev in ip_devs]
def process_router(self, ri):
ri.iptables_manager.defer_apply_on()
ex_gw_port = self._get_ex_gw_port(ri)
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
existing_port_ids = set([p['id'] for p in ri.internal_ports])
current_port_ids = set([p['id'] for p in internal_ports
if p['admin_state_up']])
new_ports = [p for p in internal_ports if
p['id'] in current_port_ids and
p['id'] not in existing_port_ids]
old_ports = [p for p in ri.internal_ports if
p['id'] not in current_port_ids]
for p in new_ports:
self._set_subnet_info(p)
self.internal_network_added(ri, p['network_id'], p['id'],
p['ip_cidr'], p['mac_address'])
ri.internal_ports.append(p)
for p in old_ports:
self.internal_network_removed(ri, p['id'], p['ip_cidr'])
ri.internal_ports.remove(p)
existing_devices = self._get_existing_devices(ri)
current_internal_devs = set([n for n in existing_devices
if n.startswith(INTERNAL_DEV_PREFIX)])
current_port_devs = set([self.get_internal_device_name(id) for
id in current_port_ids])
stale_devs = current_internal_devs - current_port_devs
for stale_dev in stale_devs:
LOG.debug(_('Deleting stale internal router device: %s'),
stale_dev)
self.driver.unplug(stale_dev,
namespace=ri.ns_name,
prefix=INTERNAL_DEV_PREFIX)
# Get IPv4 only internal CIDRs
internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
if netaddr.IPNetwork(p['ip_cidr']).version == 4]
# TODO(salv-orlando): RouterInfo would be a better place for
# this logic too
ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
ri.ex_gw_port and ri.ex_gw_port['id'])
interface_name = None
if ex_gw_port_id:
interface_name = self.get_external_device_name(ex_gw_port_id)
if ex_gw_port and ex_gw_port != ri.ex_gw_port:
self._set_subnet_info(ex_gw_port)
self.external_gateway_added(ri, ex_gw_port,
interface_name, internal_cidrs)
elif not ex_gw_port and ri.ex_gw_port:
self.external_gateway_removed(ri, ri.ex_gw_port,
interface_name, internal_cidrs)
stale_devs = [dev for dev in existing_devices
if dev.startswith(EXTERNAL_DEV_PREFIX)
and dev != interface_name]
for stale_dev in stale_devs:
LOG.debug(_('Deleting stale external router device: %s'),
stale_dev)
self.driver.unplug(stale_dev,
bridge=self.conf.external_network_bridge,
namespace=ri.ns_name,
prefix=EXTERNAL_DEV_PREFIX)
# Process static routes for router
self.routes_updated(ri)
# Process SNAT rules for external gateway
ri.perform_snat_action(self._handle_router_snat_rules,
internal_cidrs, interface_name)
# Process SNAT/DNAT rules for floating IPs
fip_statuses = {}
try:
if ex_gw_port:
existing_floating_ips = ri.floating_ips
self.process_router_floating_ip_nat_rules(ri)
ri.iptables_manager.defer_apply_off()
# Once NAT rules for floating IPs are safely in place
# configure their addresses on the external gateway port
fip_statuses = self.process_router_floating_ip_addresses(
ri, ex_gw_port)
except Exception:
# TODO(salv-orlando): Less broad catching
# All floating IPs must be put in error state
for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR
if ex_gw_port:
# Identify floating IPs which were disabled
ri.floating_ips = set(fip_statuses.keys())
for fip_id in existing_floating_ips - ri.floating_ips:
fip_statuses[fip_id] = l3_constants.FLOATINGIP_STATUS_DOWN
# Update floating IP status on the neutron server
self.plugin_rpc.update_floatingip_statuses(
self.context, ri.router_id, fip_statuses)
# Update ex_gw_port and enable_snat on the router info cache
ri.ex_gw_port = ex_gw_port
ri.enable_snat = ri.router.get('enable_snat')
def _handle_router_snat_rules(self, ri, ex_gw_port, internal_cidrs,
interface_name, action):
# Remove all the rules
# This is safe because if use_namespaces is set as False
# then the agent can only configure one router, otherwise
# each router's SNAT rules will be in their own namespace
ri.iptables_manager.ipv4['nat'].empty_chain('POSTROUTING')
ri.iptables_manager.ipv4['nat'].empty_chain('snat')
# Add back the jump to float-snat
ri.iptables_manager.ipv4['nat'].add_rule('snat', '-j $float-snat')
# And add them back if the action if add_rules
if action == 'add_rules' and ex_gw_port:
# ex_gw_port should not be None in this case
# NAT rules are added only if ex_gw_port has an IPv4 address
for ip_addr in ex_gw_port['fixed_ips']:
ex_gw_ip = ip_addr['ip_address']
if netaddr.IPAddress(ex_gw_ip).version == 4:
rules = self.external_gateway_nat_rules(ex_gw_ip,
internal_cidrs,
interface_name)
for rule in rules:
ri.iptables_manager.ipv4['nat'].add_rule(*rule)
break
ri.iptables_manager.apply()
def process_router_floating_ip_nat_rules(self, ri):
"""Configure NAT rules for the router's floating IPs.
Configures iptables rules for the floating ips of the given router
"""
# Clear out all iptables rules for floating ips
ri.iptables_manager.ipv4['nat'].clear_rules_by_tag('floating_ip')
# Loop once to ensure that floating ips are configured.
for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
# Rebuild iptables rules for the floating ip.
fixed = fip['fixed_ip_address']
fip_ip = fip['floating_ip_address']
for chain, rule in self.floating_forward_rules(fip_ip, fixed):
ri.iptables_manager.ipv4['nat'].add_rule(chain, rule,
tag='floating_ip')
ri.iptables_manager.apply()
def process_router_floating_ip_addresses(self, ri, ex_gw_port):
"""Configure IP addresses on router's external gateway interface.
Ensures addresses for existing floating IPs and cleans up
those that should not longer be configured.
"""
fip_statuses = {}
interface_name = self.get_external_device_name(ex_gw_port['id'])
device = ip_lib.IPDevice(interface_name, self.root_helper,
namespace=ri.ns_name)
existing_cidrs = set([addr['cidr'] for addr in device.addr.list()])
new_cidrs = set()
# Loop once to ensure that floating ips are configured.
for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
fip_ip = fip['floating_ip_address']
ip_cidr = str(fip_ip) + FLOATING_IP_CIDR_SUFFIX
new_cidrs.add(ip_cidr)
if ip_cidr not in existing_cidrs:
net = netaddr.IPNetwork(ip_cidr)
try:
device.addr.add(net.version, ip_cidr, str(net.broadcast))
except (processutils.UnknownArgumentError,
processutils.ProcessExecutionError):
# any exception occurred here should cause the floating IP
# to be set in error state
fip_statuses[fip['id']] = (
l3_constants.FLOATINGIP_STATUS_ERROR)
LOG.warn(_("Unable to configure IP address for "
"floating IP: %s"), fip['id'])
continue
# As GARP is processed in a distinct thread the call below
# won't raise an exception to be handled.
self._send_gratuitous_arp_packet(
ri, interface_name, fip_ip)
fip_statuses[fip['id']] = (
l3_constants.FLOATINGIP_STATUS_ACTIVE)
# Clean up addresses that no longer belong on the gateway interface.
for ip_cidr in existing_cidrs - new_cidrs:
if ip_cidr.endswith(FLOATING_IP_CIDR_SUFFIX):
net = netaddr.IPNetwork(ip_cidr)
device.addr.delete(net.version, ip_cidr)
return fip_statuses
def _get_ex_gw_port(self, ri):
return ri.router.get('gw_port')
def _arping(self, ri, interface_name, ip_address):
arping_cmd = ['arping', '-A',
'-I', interface_name,
'-c', self.conf.send_arp_for_ha,
ip_address]
try:
ip_wrapper = ip_lib