neutron/neutron/agent/l3/extensions/port_forwarding.py

465 lines
20 KiB
Python

# Copyright 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import netaddr
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from neutron_lib import rpc as n_rpc
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import coordination
LOG = logging.getLogger(__name__)
DEFAULT_PORT_FORWARDING_CHAIN = 'fip-pf'
PORT_FORWARDING_PREFIX = 'fip_portforwarding-'
PORT_FORWARDING_CHAIN_PREFIX = 'pf-'
class RouterFipPortForwardingMapping(object):
def __init__(self):
self.managed_port_forwardings = {}
"""
fip_port_forwarding = {
fip_id_1: set(pf_id1, pf_id2),
fip_id_2: set(pf_id3, pf_id4)
}
"""
self.fip_port_forwarding = collections.defaultdict(set)
"""
router_fip_mapping = {
router_id_1: set(fip_id_1, fip_id_2),
router_id_2: set(fip_id_3, fip_id_4)
}
"""
self.router_fip_mapping = collections.defaultdict(set)
@lockutils.synchronized('port-forwarding-cache')
def set_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
self._set_fip_port_forwarding(port_forwarding.floatingip_id,
port_forwarding,
port_forwarding.router_id)
@lockutils.synchronized('port-forwarding-cache')
def update_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
self.managed_port_forwardings[port_forwarding.id] = port_forwarding
@lockutils.synchronized('port-forwarding-cache')
def del_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
if not self.managed_port_forwardings.get(port_forwarding.id):
continue
self.managed_port_forwardings.pop(port_forwarding.id)
self.fip_port_forwarding[port_forwarding.floatingip_id].discard(
port_forwarding.id)
if not self.fip_port_forwarding[port_forwarding.floatingip_id]:
self.fip_port_forwarding.pop(port_forwarding.floatingip_id)
self.router_fip_mapping[port_forwarding.router_id].discard(
port_forwarding.floatingip_id)
if not self.router_fip_mapping[port_forwarding.router_id]:
del self.router_fip_mapping[port_forwarding.router_id]
def _set_fip_port_forwarding(self, fip_id, pf, router_id):
self.router_fip_mapping[router_id].add(fip_id)
self.fip_port_forwarding[fip_id].add(pf.id)
self.managed_port_forwardings[pf.id] = pf
@lockutils.synchronized('port-forwarding-cache')
def clear_by_fip(self, fip_id, router_id):
self.router_fip_mapping[router_id].discard(fip_id)
if len(self.router_fip_mapping[router_id]) == 0:
del self.router_fip_mapping[router_id]
for pf_id in self.fip_port_forwarding[fip_id]:
del self.managed_port_forwardings[pf_id]
del self.fip_port_forwarding[fip_id]
@lockutils.synchronized('port-forwarding-cache')
def check_port_forwarding_changes(self, new_pf):
old_pf = self.managed_port_forwardings.get(new_pf.id)
return old_pf != new_pf
class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.PORTFORWARDING]
def initialize(self, connection, driver_type):
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers()
self.mapping = RouterFipPortForwardingMapping()
def _register_rpc_consumers(self):
registry.register(self._handle_notification,
resources.PORTFORWARDING)
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.PORTFORWARDING)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def consume_api(self, agent_api):
self.agent_api = agent_api
def _handle_notification(self, context, resource_type,
forwardings, event_type):
for forwarding in forwardings:
self._process_port_forwarding_event(
context, forwarding, event_type)
def _store_local(self, pf_objs, event_type):
if event_type == events.CREATED:
self.mapping.set_port_forwardings(pf_objs)
elif event_type == events.UPDATED:
self.mapping.update_port_forwardings(pf_objs)
elif event_type == events.DELETED:
self.mapping.del_port_forwardings(pf_objs)
def _get_fip_rules(self, port_forward, wrap_name):
chain_rule_list = []
pf_chain_name = self._get_port_forwarding_chain_name(port_forward.id)
chain_rule_list.append((DEFAULT_PORT_FORWARDING_CHAIN,
'-j %s-%s' %
(wrap_name, pf_chain_name)))
floating_ip_address = str(port_forward.floating_ip_address)
protocol = port_forward.protocol
internal_ip_address = str(port_forward.internal_ip_address)
internal_port = port_forward.internal_port
external_port = port_forward.external_port
chain_rule = (pf_chain_name,
'-d %s/32 -p %s -m %s --dport %s '
'-j DNAT --to-destination %s:%s' % (
floating_ip_address, protocol, protocol,
external_port, internal_ip_address,
internal_port))
chain_rule_list.append(chain_rule)
return chain_rule_list
def _rule_apply(self, iptables_manager, port_forwarding, rule_tag):
iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag)
if DEFAULT_PORT_FORWARDING_CHAIN not in iptables_manager.ipv4[
'nat'].chains:
self._install_default_rules(iptables_manager)
for chain, rule in self._get_fip_rules(
port_forwarding, iptables_manager.wrap_name):
if chain not in iptables_manager.ipv4['nat'].chains:
iptables_manager.ipv4['nat'].add_chain(chain)
iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
@coordination.synchronized('router-lock-ns-{namespace}')
def _process_create(self, port_forwardings, ri, interface_name, namespace,
iptables_manager):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
is_distributed = ri.router.get('distributed')
ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None)
fip_statuses = {}
for port_forwarding in port_forwardings:
# check if the port forwarding is managed in this agent from
# OVO and router rpc.
if port_forwarding.id in self.mapping.managed_port_forwardings:
LOG.debug("Skip port forwarding %s for create, as it had been "
"managed by agent", port_forwarding.id)
continue
existing_cidrs = ri.get_router_cidrs(device)
fip_ip = str(port_forwarding.floating_ip_address)
fip_cidr = str(netaddr.IPNetwork(fip_ip))
status = ''
if fip_cidr not in existing_cidrs:
try:
if not is_distributed:
fip_statuses[port_forwarding.floatingip_id] = (
ri.add_floating_ip(
{'floating_ip_address': fip_ip},
interface_name, device))
else:
if not ha_port:
device.addr.add(fip_cidr)
ip_lib.send_ip_addr_adv_notif(namespace,
interface_name,
fip_ip)
else:
ri._add_vip(fip_cidr, interface_name)
status = constants.FLOATINGIP_STATUS_ACTIVE
except Exception:
# Any error will causes the fip status to be set 'ERROR'
status = constants.FLOATINGIP_STATUS_ERROR
LOG.warning("Unable to configure floating IP %(fip_id)s "
"for port forwarding %(pf_id)s",
{'fip_id': port_forwarding.floatingip_id,
'pf_id': port_forwarding.id})
else:
if not ha_port:
ip_lib.send_ip_addr_adv_notif(namespace,
interface_name,
fip_ip)
if status:
fip_statuses[port_forwarding.floatingip_id] = status
if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE:
ri.enable_keepalived()
for port_forwarding in port_forwardings:
rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
self._rule_apply(iptables_manager, port_forwarding, rule_tag)
iptables_manager.apply()
self._sending_port_forwarding_fip_status(ri, fip_statuses)
self._store_local(port_forwardings, events.CREATED)
def _sending_port_forwarding_fip_status(self, ri, statuses):
if not statuses:
return
LOG.debug('Sending Port Forwarding floating ip '
'statuses: %s', statuses)
# Update floating IP status on the neutron server
ri.agent.plugin_rpc.update_floatingip_statuses(
ri.agent.context, ri.router_id, statuses)
def _get_resource_by_router(self, ri):
is_distributed = ri.router.get('distributed')
ex_gw_port = ri.get_ex_gw_port()
if not is_distributed:
interface_name = ri.get_external_device_interface_name(ex_gw_port)
namespace = ri.ns_name
iptables_manager = ri.iptables_manager
else:
interface_name = ri.get_snat_external_device_interface_name(
ex_gw_port)
namespace = ri.snat_namespace.name
iptables_manager = ri.snat_iptables_manager
return interface_name, namespace, iptables_manager
def _check_if_need_process(self, ri, force=False):
# force means the request comes from, if True means it comes from OVO,
# as we get a actually port forwarding object, then we need to check in
# the following steps. But False, means it comes from router rpc.
if not ri or not ri.get_ex_gw_port() or (
not force and not ri.fip_managed_by_port_forwardings):
# agent doesn't hold the router. pass
# This router doesn't own a gw port. pass
# This router doesn't hold a port forwarding mapping. pass
return False
is_distributed = ri.router.get('distributed')
agent_mode = ri.agent_conf.agent_mode
if (is_distributed and
agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL,
constants.L3_AGENT_MODE_DVR]):
# just support centralized cases
return False
if is_distributed and not ri.snat_namespace.exists():
return False
return True
def _process_port_forwarding_event(self, context, port_forwarding,
event_type):
router_id = port_forwarding.router_id
ri = self._get_router_info(router_id)
if not self._check_if_need_process(ri, force=True):
return
(interface_name, namespace,
iptables_manager) = self._get_resource_by_router(ri)
if event_type == events.CREATED:
self._process_create(
[port_forwarding], ri, interface_name, namespace,
iptables_manager)
elif event_type == events.UPDATED:
self._process_update([port_forwarding], iptables_manager,
interface_name, namespace)
elif event_type == events.DELETED:
self._process_delete(
context, [port_forwarding], ri, interface_name, namespace,
iptables_manager)
@coordination.synchronized('router-lock-ns-{namespace}')
def _process_update(self, port_forwardings, iptables_manager,
interface_name, namespace):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
for port_forwarding in port_forwardings:
# check if port forwarding change from OVO and router rpc
if not self.mapping.check_port_forwarding_changes(port_forwarding):
LOG.debug("Skip port forwarding %s for update, as there is no "
"difference between the memory managed by agent",
port_forwarding.id)
continue
current_chain = self._get_port_forwarding_chain_name(
port_forwarding.id)
iptables_manager.ipv4['nat'].remove_chain(current_chain)
ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id]
device.delete_socket_conntrack_state(
str(ori_pf.floating_ip_address), ori_pf.external_port,
protocol=ori_pf.protocol)
rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
self._rule_apply(iptables_manager, port_forwarding, rule_tag)
iptables_manager.apply()
self._store_local(port_forwardings, events.UPDATED)
@coordination.synchronized('router-lock-ns-{namespace}')
def _process_delete(self, context, port_forwardings, ri, interface_name,
namespace, iptables_manager):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
for port_forwarding in port_forwardings:
current_chain = self._get_port_forwarding_chain_name(
port_forwarding.id)
iptables_manager.ipv4['nat'].remove_chain(current_chain)
fip_address = str(port_forwarding.floating_ip_address)
device.delete_socket_conntrack_state(
fip_address, port_forwarding.external_port,
protocol=port_forwarding.protocol)
iptables_manager.apply()
fip_id_cidrs = set([(pf.floatingip_id,
str(pf.floating_ip_address)) for pf in
port_forwardings])
self._sync_and_remove_fip(context, fip_id_cidrs, device, ri)
self._store_local(port_forwardings, events.DELETED)
def _sync_and_remove_fip(self, context, fip_id_cidrs, device, ri):
if not fip_id_cidrs:
return
ha_port = ri.router.get(constants.HA_INTERFACE_KEY)
fip_ids = [item[0] for item in fip_id_cidrs]
pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING,
filter_kwargs={
'floatingip_id': fip_ids})
exist_fips = set()
fip_status = {}
for pf in pfs:
exist_fips.add(pf.floatingip_id)
for fip_id_cidr in fip_id_cidrs:
if fip_id_cidr[0] not in exist_fips:
if ha_port:
ri._remove_vip(fip_id_cidr[1])
else:
device.delete_addr_and_conntrack_state(fip_id_cidr[1])
fip_status[fip_id_cidr[0]] = 'DOWN'
if ha_port:
ri.enable_keepalived()
self._sending_port_forwarding_fip_status(ri, fip_status)
for fip_id in fip_status.keys():
self.mapping.clear_by_fip(fip_id, ri.router_id)
def _get_router_info(self, router_id):
router_info = self.agent_api.get_router_info(router_id)
if router_info:
return router_info
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.", router_id)
def _get_port_forwarding_chain_name(self, pf_id):
chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id
return chain_name[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
def _install_default_rules(self, iptables_manager):
default_rule = '-j %s-%s' % (iptables_manager.wrap_name,
DEFAULT_PORT_FORWARDING_CHAIN)
iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN)
iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule)
iptables_manager.apply()
def check_local_port_forwardings(self, context, ri, fip_ids):
pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING,
filter_kwargs={
'floatingip_id': fip_ids})
(interface_name, namespace,
iptable_manager) = self._get_resource_by_router(ri)
local_pfs = set(self.mapping.managed_port_forwardings.keys())
new_pfs = []
updated_pfs = []
current_pfs = set()
for pf in pfs:
# check the request port forwardings, and split them into
# update, new, current part from router rpc
if pf.id in self.mapping.managed_port_forwardings:
if self.mapping.check_port_forwarding_changes(pf):
updated_pfs.append(pf)
else:
new_pfs.append(pf)
current_pfs.add(pf.id)
remove_pf_ids_set = local_pfs - current_pfs
remove_pfs = [self.mapping.managed_port_forwardings[pf_id]
for pf_id in remove_pf_ids_set]
self._process_update(updated_pfs, iptable_manager,
interface_name, namespace)
self._process_create(new_pfs, ri, interface_name,
namespace, iptable_manager)
self._process_delete(context, remove_pfs, ri, interface_name,
namespace, iptable_manager)
def process_port_forwarding(self, context, data):
ri = self._get_router_info(data['id'])
if not self._check_if_need_process(ri):
return
self.check_local_port_forwardings(
context, ri, ri.fip_managed_by_port_forwardings)
def add_router(self, context, data):
"""Handle a router add event.
Called on router create.
:param context: RPC context.
:param data: Router data.
"""
self.process_port_forwarding(context, data)
def update_router(self, context, data):
"""Handle a router update event.
Called on router update.
:param context: RPC context.
:param data: Router data.
"""
self.process_port_forwarding(context, data)
def delete_router(self, context, data):
"""Handle a router delete event.
:param context: RPC context.
:param data: Router data.
"""
pass
def ha_state_change(self, context, data):
pass