Ian Wells 8027ba0ad7 Use AFTER_SPAWN not AFTER_INIT in Neutron to start watcher
We want one additional thread in Neutron for the return worker,
and we really only need one additional thread per formward worker.

When we have multiple threads (with AFTER_INIT) this seems to be
upsetting the process of port binding, particularly on router and
DHCP ports, because of the multiple almost simultaneous binding
messages that trigger in the herd of threads.

Change-Id: I68dbd02b8a235b128779357d4bf3df26b5bf604a
2020-08-03 20:48:16 +00:00

1117 lines
48 KiB
Python

# Copyright (c) 2016 Cisco Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import ABC, abstractmethod
from collections import namedtuple
import etcd
import eventlet
import eventlet.event
import os
from oslo_config import cfg
from oslo_log import log as logging
import re
import time
from networking_vpp import config_opts
from networking_vpp import constants as nvpp_const
from networking_vpp.db import db
from networking_vpp import etcdutils
from networking_vpp.ext_manager import ExtensionManager
from networking_vpp.extension import MechDriverExtensionBase
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
import neutron_lib.constants as n_const
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
import neutron.conf.agent.securitygroups_rpc
from neutron.db import provisioning_blocks
LOG = logging.getLogger(__name__)
class VPPMechanismDriver(api.MechanismDriver):
supported_vnic_types = [portbindings.VNIC_NORMAL]
MECH_NAME = 'vpp'
def initialize(self):
config_opts.register_vpp_opts(cfg.CONF)
neutron.conf.agent.securitygroups_rpc.register_securitygroups_opts(
cfg.CONF)
self.allowed_network_types = cfg.CONF.ml2_vpp.network_types
self.communicator = EtcdAgentCommunicator(self.port_bind_complete)
names = names = cfg.CONF.ml2_vpp.driver_extensions
if names != '':
self.mgr = ExtensionManager(
'networking_vpp.driver.extensions',
names,
MechDriverExtensionBase)
self.mgr.call_all('run', self.communicator)
def get_vif_type(self, port_context):
"""Determine the type of the vif to be bound from port context"""
# Default vif type
vif_type = 'vhostuser'
# We have to explicitly avoid binding agent ports - DHCP,
# L3 etc. - as vhostuser. The interface code below takes
# care of those.
owner = port_context.current['device_owner']
for f in n_const.DEVICE_OWNER_PREFIXES:
if owner.startswith(f):
vif_type = 'tap'
LOG.debug("vif_type to be bound is: %s", vif_type)
return vif_type
def bind_port(self, port_context):
"""Attempt to bind a port.
:param port_context: PortContext instance describing the port
This method is called outside any transaction to attempt to
establish a port binding using this mechanism driver. Bindings
may be created at each of multiple levels of a hierarchical
network, and are established from the top level downward. At
each level, the mechanism driver determines whether it can
bind to any of the network segments in the
port_context.segments_to_bind property, based on the value of the
port_context.host property, any relevant port or network
attributes, and its own knowledge of the network topology. At
the top level, port_context.segments_to_bind contains the static
segments of the port's network. At each lower level of
binding, it contains static or dynamic segments supplied by
the driver that bound at the level above. If the driver is
able to complete the binding of the port to any segment in
port_context.segments_to_bind, it must call port_context.set_binding
with the binding details. If it can partially bind the port,
it must call port_context.continue_binding with the network
segments to be used to bind at the next lower level.
If the binding results are committed after bind_port returns,
they will be seen by all mechanism drivers as
update_port_precommit and update_port_postcommit calls. But if
some other thread or process concurrently binds or updates the
port, these binding results will not be committed, and
update_port_precommit and update_port_postcommit will not be
called on the mechanism drivers with these results. Because
binding results can be discarded rather than committed,
drivers should avoid making persistent state changes in
bind_port, or else must ensure that such state changes are
eventually cleaned up.
Implementing this method explicitly declares the mechanism
driver as having the intention to bind ports. This is inspected
by the QoS service to identify the available QoS rules you
can use with ports.
"""
LOG.debug("Attempting to bind port %(port)s on "
"network %(network)s",
{'port': port_context.current['id'],
'network': port_context.network.current['id']})
vnic_type = port_context.current.get(portbindings.VNIC_TYPE,
portbindings.VNIC_NORMAL)
if vnic_type not in self.supported_vnic_types:
LOG.debug("Refusing to bind due to unsupported "
"vnic_type: %s",
vnic_type)
return
for segment in port_context.segments_to_bind:
if self.check_segment(segment, port_context.host):
vif_details = {}
# TODO(ijw) should be in a library that the agent uses
vif_type = self.get_vif_type(port_context)
if vif_type == 'vhostuser':
vif_details['vhostuser_socket'] = \
os.path.join(cfg.CONF.ml2_vpp.vhost_user_dir,
port_context.current['id'])
vif_details['vhostuser_mode'] = 'server'
LOG.debug('Setting details: %s', vif_details)
port_context.set_binding(segment[api.ID],
vif_type,
vif_details)
LOG.debug("Bind selected using segment: %s", segment)
return
def check_segment(self, segment, host):
"""Check if segment can be bound.
:param segment: segment dictionary describing segment to bind
:param host: host on which the segment must be bound to a port
:returns: True iff segment can be bound for host
"""
# TODO(ijw): naive - doesn't check host, or configured
# physnets on the host. Should work out if the binding
# can't be achieved before accepting it
network_type = segment[api.NETWORK_TYPE]
if network_type not in self.allowed_network_types:
LOG.debug(
'Network %(network_id)s is %(network_type)s, '
'but this driver only supports types '
'%(allowed_network_types)s. '
'The type must be supported if binding is to succeed.',
{'network_id': segment['id'],
'network_type': network_type,
'allowed_network_types':
', '.join(self.allowed_network_types)}
)
return False
if network_type in [n_const.TYPE_FLAT,
n_const.TYPE_VLAN]:
physnet = segment[api.PHYSICAL_NETWORK]
if not self.physnet_known(host, physnet):
LOG.debug(
'Network %(network_id)s is on physical '
'network %(physnet)s, but the physical network '
'is not one the host %(host)s has attached.',
{'network_id': segment['id'],
'physnet': physnet,
'host': host}
)
return False
return True
def physnet_known(self, host, physnet):
return self.communicator.find_physnet(host, physnet)
def check_vlan_transparency(self, port_context):
"""Check if the network supports vlan transparency.
:param port_context: NetworkContext instance describing the network.
In general we do not support VLAN transparency (yet).
"""
return False
def update_port_precommit(self, port_context):
"""Work to do, during DB commit, when updating a port
If we are partially responsible for binding this port, we will
have to tell our agents they have work to do. This is an
operation within a distributed system and can therefore take
time to complete, or potentially even fail. Instead, we log
the requirement to a journal.
"""
# TODO(ijw): optimisation: the update port may leave the
# binding state the same as before if someone updated
# something other than the binding on the port, but this
# way we always send it out and it's the far end's job to
# ignore it. Doing less work is nevertheless good, so we
# should in future avoid the send.
# unbind port from old host, if already bound
if port_context.original_binding_levels is not None:
prev_bind = port_context.original_binding_levels[-1]
if (prev_bind is not None and
prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME and
port_context.host != port_context.original_host):
# Note that we skip this step if the change happens while
# 'unbinding' and rebinding to the same host - it's probably
# an update of extraneous detail and not really a request
# that requires binding.
self.communicator.unbind(port_context._plugin_context.session,
port_context.original,
port_context.original_host,
prev_bind[api.BOUND_SEGMENT]
)
# (Re)bind port to the new host, if it needs to be bound
if port_context.binding_levels is not None:
current_bind = port_context.binding_levels[-1]
if (current_bind is not None and
current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
binding_type = self.get_vif_type(port_context)
# Remove port membership from any previously associated
# security groups for updating remote_security_group_id ACLs
self.communicator.remove_port_from_remote_groups(
port_context._plugin_context.session,
port_context.original,
port_context.current)
self.communicator.bind(port_context._plugin_context.session,
port_context.current,
current_bind[api.BOUND_SEGMENT],
port_context.host,
binding_type)
# TODO(ijW): The agent driver checks for a change of
# host, but we're oddly seeing that the orig_host is
# always set. Should confirm if this is a problem or
# not.
self._insert_provisioning_block(port_context)
# If this is part of a successful live migration where the
# target compute's nova has invoked update_port() binding the
# port to the target compute and removing the 'migrating_to'
# key from the binding:profile attribute then this is our
# chance to remove etcd entries of the old port binding
if (port_context.original is not None and
port_context.original_host is not None and
port_context.original_host != port_context.host and
'migrating_to' not in
port_context.current['binding:profile'] and
'migrating_to' in
port_context.original['binding:profile'] and
port_context.original['binding:profile']['migrating_to']
== port_context.host):
self.communicator.unbind(
port_context._plugin_context.session,
port_context.original,
port_context.original_host,
current_bind[api.BOUND_SEGMENT]
)
def port_bind_complete(self, port_id, host):
"""Tell things that the port is truly bound.
This is a callback called by the etcd communicator.
"""
LOG.debug('bind complete on %s', port_id)
self._release_provisioning_block(host, port_id)
def _insert_provisioning_block(self, context):
# we insert a status barrier to prevent the port from transitioning
# to active until the agent reports back that the wiring is done
port = context.current
if port['status'] == n_const.PORT_STATUS_ACTIVE:
# no point in putting in a block if the status is already ACTIVE
return
provisioning_blocks.add_provisioning_component(
context._plugin_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def _release_provisioning_block(self, host, port_id):
context = n_context.get_admin_context()
provisioning_blocks.provisioning_complete(
context, port_id, resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def update_port_postcommit(self, port_context):
"""Work to do, post-DB commit, when updating a port
After accepting an update_port, determine if we logged any
work to do on the networking devices, and - if so - kick the
update thread that tells our minions.
"""
# TODO(ijw): optimisation: the update port may leave the
# binding state the same as before if someone updated
# something other than the binding on the port, but this
# way we always send it out and it's the far end's job to
# ignore it. Doing less work is nevertheless good, so we
# should in future avoid the send.
if port_context.binding_levels is not None:
current_bind = port_context.binding_levels[-1]
if port_context.original_binding_levels is None:
prev_bind = None
else:
prev_bind = port_context.original_binding_levels[-1]
if (current_bind is not None and
current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
self.communicator.kick()
elif (prev_bind is not None and
prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
self.communicator.kick()
def delete_port_precommit(self, port_context):
port = port_context.current
host = port_context.host
# NB: Host is typically '' if the port is not bound
# A port can be in an invalid state with a host context
if host and port_context.binding_levels:
segment = port_context.binding_levels[-1][api.BOUND_SEGMENT]
self.communicator.unbind(port_context._plugin_context.session,
port, host, segment)
def delete_port_postcommit(self, port_context):
self.communicator.kick()
class AgentCommunicator(ABC):
@abstractmethod
def bind(self, port, segment, host, binding_type):
pass
@abstractmethod
def unbind(self, port, host):
pass
# Our prefix for etcd keys, in case others are using etcd.
LEADIN = nvpp_const.LEADIN # TODO(ijw): make configurable?
# Model for representing a security group
SecurityGroup = namedtuple(
'SecurityGroup', ['id', 'ingress_rules', 'egress_rules']
)
# Model for a VPP security group rule
SecurityGroupRule = namedtuple(
'SecurityGroupRule', ['is_ipv6', 'remote_ip_addr',
'ip_prefix_len', 'remote_group_id',
'protocol', 'port_min', 'port_max']
)
class JournalManager(object):
"""Perform journal management tasks.
Handles the post-journal part of the work that clears out the table and
updates etcd. Also kicks the forward worker threads that is active and
tells it that it has work to do.
"""
def __init__(self):
self.client_factory = etcdutils.EtcdClientFactory(cfg.CONF.ml2_vpp)
self.election_key_space = LEADIN + '/election'
self.journal_kick_key = self.election_key_space + '/kick-journal'
LOG.debug('Journal manager init complete')
def kick(self):
# A thread in one Neutron process - possibly not this one -
# is waiting to send updates from the DB to etcd. Wake it.
try:
# TODO(ijw): got to be a more efficient way to create
# a client for each thread
# From a Neutron thread, we need to tell whichever of the
# forwarder threads that is active that it has work to do.
self.client_factory.client().write(self.journal_kick_key, '')
except etcd.EtcdException:
# A failed wake is not the end of the world.
pass
######################################################################
# The post-journal part of the work that clears out the table and
# updates etcd.
def make_forward_worker(self):
# Assign a UUID to each worker thread to enable thread election
return eventlet.spawn(self._forward_worker)
def do_etcd_update(self, etcd_writer, k, v):
with eventlet.Timeout(cfg.CONF.ml2_vpp.etcd_write_time, False):
if v is None:
etcd_writer.delete(k)
else:
etcd_writer.write(k, v)
def _forward_worker(self):
LOG.debug('forward worker begun')
etcd_client = self.client_factory.client()
etcd_writer = etcdutils.json_writer(etcd_client)
lease_time = cfg.CONF.ml2_vpp.forward_worker_master_lease_time
recovery_time = cfg.CONF.ml2_vpp.forward_worker_recovery_time
etcd_election = etcdutils.EtcdElection(etcd_client, 'forward_worker',
self.election_key_space,
work_time=lease_time,
recovery_time=recovery_time)
while True:
# Try indefinitely to regain the mastery of this thread pool. Most
# threads will be sitting here
etcd_election.wait_until_elected()
try:
# Master loop - as long as we are master and can
# maintain it, process incoming events.
# Every long running section is preceded by extending
# mastership of the thread pool and followed by
# confirmation that we still have mastership (usually
# by a further extension).
def work(k, v):
self.do_etcd_update(etcd_writer, k, v)
# We will try to empty the pending rows in the DB
while True:
etcd_election.extend_election(
cfg.CONF.ml2_vpp.db_query_time)
session = n_context.get_admin_context().session
maybe_more = db.journal_read(session, work)
if not maybe_more:
LOG.debug('forward worker has emptied journal')
etcd_election.extend_election(lease_time)
break
# work queue is now empty.
# Wait to be kicked, or (in case of emergency) run
# every few seconds in case another thread or process
# dumped work and failed to get notification to us to
# process it.
with eventlet.Timeout(lease_time + 1, False):
etcd_election.extend_election(lease_time)
try:
etcd_client.watch(self.journal_kick_key,
timeout=lease_time)
except etcd.EtcdException:
# Check the DB queue now, anyway
pass
except etcdutils.EtcdElectionLost:
# We are no longer master
pass
except Exception as e:
# TODO(ijw): log exception properly
LOG.warning("problems in forward worker - Error name is %s. "
"proceeding without quiting", type(e).__name__)
LOG.warning("Exception in forward_worker: %s", e)
# something went bad; breathe, in case we end
# up in a tight loop
time.sleep(1)
# never quit
pass
######################################################################
class EtcdAgentCommunicator(AgentCommunicator, JournalManager):
"""Comms unit for etcd
This will talk to etcd and tell it what is going on
with the Neutron database. etcd can be run as a
cluster and so shouldn't die, but can be unreachable,
so this class's job is to ensure that all updates are
forwarded to etcd in order even when things are not
quite going as planned.
In etcd, the layout is:
# Port Space
LEADIN/nodes - subdirs are compute nodes
LEADIN/nodes/X/ports - entries are JSON-ness containing
all information on each bound port on the compute node.
(Unbound ports are homeless, so the act of unbinding is
the deletion of this entry.)
# Global Space
LEADIN/global/secgroups/<security-group-id> - entries are security-group
keys whose values contain all the rule data
LEADIN/global/networks/gpe/<vni>/<hostname>/<mac>/<ip-address> - Entries
contain GPE data such as the VNI, hostname, instance's mac & IP address
and key value is the underlay IP address of the compute node
LEADIN/global/remote_group/<group-id>/<port-id> contain the IP addresses
of ports in a security-group
# State Space
LEADIN/state/X - return state of the VPP
LEADIN/state/X/alive - heartbeat back
LEADIN/state/X/ports - port information.
LEADIN/state/X/physnets - physnets on node
Specifically a key here (regardless of value) indicates
the port has been bound and is receiving traffic.
"""
def __init__(self, notify_bound):
super(EtcdAgentCommunicator, self).__init__()
LOG.debug("Using etcd host:%s port:%s user:%s",
cfg.CONF.ml2_vpp.etcd_host,
cfg.CONF.ml2_vpp.etcd_port,
cfg.CONF.ml2_vpp.etcd_user)
# This is a function that is called when a port has been
# notified from the agent via etcd as completely attached.
# We call this when we're certain that the VPP on the far end
# has definitely bound the port, and has dropped a vhost-user
# socket where it can be found.
# This is more important than it seems, becaus libvirt will
# hang, because qemu ignores its monitor port, when qemu is
# waiting for a partner to connect with on its vhost-user
# interfaces. It can't start the VM - that requires
# information from its partner it can't guess at - but it
# shouldn't hang the monitor - nevertheless... So we notify
# when the port is there and ready, and qemu is never put into
# this state by Nova.
self.notify_bound = notify_bound
# We need certain directories to exist
self.state_key_space = LEADIN + '/state'
self.port_key_space = LEADIN + '/nodes'
self.secgroup_key_space = LEADIN + '/global/secgroups'
self.remote_group_key_space = LEADIN + '/global/remote_group'
self.gpe_key_space = LEADIN + '/global/networks/gpe'
etcd_client = self.client_factory.client()
etcd_helper = etcdutils.EtcdHelper(etcd_client)
etcd_helper.ensure_dir(self.state_key_space)
etcd_helper.ensure_dir(self.port_key_space)
etcd_helper.ensure_dir(self.secgroup_key_space)
etcd_helper.ensure_dir(self.election_key_space)
etcd_helper.ensure_dir(self.remote_group_key_space)
self.secgroup_enabled = cfg.CONF.SECURITYGROUP.enable_security_group
if self.secgroup_enabled:
self.register_secgroup_event_handler()
# TODO(ijw): .../state/<host> lists all known hosts, and they
# heartbeat when they're functioning
# From this point on, there are multiple threads: ensure that
# we don't re-use the etcd_client from multiple threads
# simultaneously
etcd_helper = None
etcd_client = None
registry.subscribe(self.start_threads,
resources.PROCESS,
events.AFTER_SPAWN)
def start_threads(self, resource, event, trigger, payload=None):
LOG.debug('Starting background threads for Neutron worker')
self.return_thread = self.make_return_worker()
self.forward_thread = self.make_forward_worker()
def find_physnet(self, host, physnet):
"""Identify if an agent can connect to the physical network
This can fail if the agent hasn't been configured for that
physnet, or if the agent isn't alive.
"""
# TODO(ijw): we use an on-the-fly created client so that we
# don't have threading problems from the caller.
try:
etcd_client = self.client_factory.client()
etcd_client.read('%s/state/%s/alive' % (LEADIN, host))
etcd_client.read('%s/state/%s/physnets/%s' %
(LEADIN, host, physnet))
except etcd.EtcdKeyNotFound:
return False
return True
def register_secgroup_event_handler(self):
"""Subscribe a handler to process secgroup change notifications
We're interested in PRECOMMIT_xxx (where we store the changes
to etcd in the journal table) and AFTER_xxx (where we
remind the worker thread it may have work to do).
"""
LOG.info("Security groups feature is enabled")
# NB security group rules cannot be updated, and security
# groups themselves have no forwarder state in them, so we
# don't need the update events
# register pre-commit events
# security group precommit events
registry.subscribe(self.process_secgroup_commit,
resources.SECURITY_GROUP,
events.PRECOMMIT_CREATE)
registry.subscribe(self.process_secgroup_commit,
resources.SECURITY_GROUP,
events.PRECOMMIT_DELETE)
# security group rule precommit events
registry.subscribe(self.process_secgroup_commit,
resources.SECURITY_GROUP_RULE,
events.PRECOMMIT_CREATE)
registry.subscribe(self.process_secgroup_commit,
resources.SECURITY_GROUP_RULE,
events.PRECOMMIT_DELETE)
# register post-commit events
# security group post commit events
registry.subscribe(self.process_secgroup_after,
resources.SECURITY_GROUP,
events.AFTER_CREATE)
registry.subscribe(self.process_secgroup_after,
resources.SECURITY_GROUP,
events.AFTER_DELETE)
# security group rule post commit events
registry.subscribe(self.process_secgroup_after,
resources.SECURITY_GROUP_RULE,
events.AFTER_CREATE)
registry.subscribe(self.process_secgroup_after,
resources.SECURITY_GROUP_RULE,
events.AFTER_DELETE)
def process_secgroup_after(self, resource, event, trigger, **kwargs):
"""Callback for handling security group/rule commit-complete events
This is when we should tell other things that a change has
happened and has been recorded permanently in the DB.
"""
# Whatever the object that caused this, we've put something
# in the journal and now need to nudge the communicator
self.kick()
def process_secgroup_commit(self, resource, event, trigger, **kwargs):
"""Callback for handling security group/rule commit events
This is the time at which we should be committing any of our
own auxiliary changes to the DB.
"""
LOG.debug("Received event %s notification for resource"
" %s with kwargs %s", event, resource, kwargs)
context = kwargs['context']
sgid = None
deleted_rule_id = None
added_rule = {}
if resource == resources.SECURITY_GROUP:
if event == events.PRECOMMIT_DELETE:
self.delete_secgroup_from_etcd(context.session,
kwargs['security_group_id'])
# Job done.
return
# All other event types drop out to the update-the-key
# code at the bottom of the function.
elif event == events.PRECOMMIT_CREATE:
# Also, the SG passed to us is what comes in from the user. We
# require what went into the DB (where we added a UUID to it),
# so we have to get that from the record of changed and as yet
# uncommitted objects.
sgid = kwargs[resource]['id']
# The default rules are supplied directly in the context
rules = kwargs[resource]['security_group_rules']
elif resource == resources.SECURITY_GROUP_RULE:
# We store security groups with a composite of all their
# rules. So in this case we track down the affected
# rule and update its entire data.
# NB: rules are never updated.
if event == events.PRECOMMIT_DELETE:
deleted_rule_id = kwargs['security_group_rule_id']
sgid = kwargs['security_group_id']
elif event == events.PRECOMMIT_CREATE:
# Groups don't have the same UUID problem - we're not
# using their UUID, we're using their SG's, which must
# be present.
added_rule = kwargs['security_group_rule']
sgid = added_rule['security_group_id']
# Start by getting the rules the DB knows about
rules = self.get_rules_for_secgroup(sgid, context)
# Adding a group or changing rules results in the same behaviour:
# rewrite the key in etcd.
# If we're in the precommit part, we may have deleted rules in this
# list and we should exclude them. We also, cautiously, exclude any
# added rule as well, though it should not have made it into the DB.
# If either is unset it's None, and != None is not a test that will
# pass.
rules = [r for r in rules
if r['id'] != deleted_rule_id
and r['id'] != added_rule.get('id')]
if added_rule:
rules.append(added_rule)
# Get the full details of the secgroup in etcd-exchange format
secgroup = self.get_secgroup_from_rules(sgid, rules)
# Write security group data to etcd
self.send_secgroup_to_agents(context.session, secgroup)
def get_rules_for_secgroup(self, sgid, context):
plugin = directory.get_plugin()
rules = plugin.get_security_group_rules(
context, filters={'security_group_id': [sgid]}
)
return rules
def get_secgroup_rule(self, rule_id, context):
"""Fetch and return a security group rule from Neutron DB"""
plugin = directory.get_plugin()
return plugin.get_security_group_rule(context, rule_id)
def get_secgroup_from_rules(self, sgid, rules):
"""Build and return a security group namedtuple object.
This object is the format with which we exchange data with
the agents, and can be written in this form to etcd.
Arguments:
sgid - ID of the security group
rules - A list of security group rules as returned from the DB
1. Filter rules using the input param: sgid to ensure that rules
belong to that group
2. Split rules based on direction.
3. Construct and return the SecurityGroup namedtuple.
"""
# A generator object of security group rules for sgid
sg_rules = (r for r in rules if r['security_group_id'] == sgid)
# A list of ingress and egress namedtuple rule objects
ingress_rules = []
egress_rules = []
for r in sg_rules:
if r['direction'] == 'ingress':
ingress_rules.append(self._neutron_rule_to_vpp_acl(r))
else:
egress_rules.append(self._neutron_rule_to_vpp_acl(r))
return SecurityGroup(sgid, ingress_rules, egress_rules)
# Neutron supports the following IP protocols by name
protocols = {'tcp': 6, 'udp': 17, 'icmp': 1, 'icmpv6': 58,
'ah': 51, 'dccp': 33, 'egp': 8, 'esp': 50, 'gre': 47,
'igmp': 2, 'ipv6-encap': 41, 'ipv6-frag': 44,
'ipv6-icmp': 58, 'ipv6-nonxt': 59, 'ipv6-opts': 60,
'ipv6-route': 43, 'ospf': 89, 'pgm': 113, 'rsvp': 46,
'sctp': 132, 'udplite': 136,
'vrrp': 112}
def _neutron_rule_to_vpp_acl(self, rule):
"""Convert a neutron rule to vpp_acl rule.
Arguments:
1. rule -- represents a neutron rule
- Convert the neutron rule to a vpp_acl rule model
- Return the SecurityGroupRule namedtuple.
"""
is_ipv6 = 0 if rule['ethertype'] == 'IPv4' else 1
if rule['protocol'] is None:
# Neutron uses None to represent any protocol
# We use 0 to represent any protocol
protocol = 0
elif rule['protocol'] in self.protocols:
# VPP rules require IANA protocol numbers
# Convert input accordingly.
protocol = self.protocols[rule['protocol']]
else:
# Convert incoming string value to an integer
protocol = int(rule['protocol'])
if is_ipv6 and protocol == self.protocols['icmp']:
protocol = self.protocols['icmpv6']
# Neutron represents any ip address by setting
# both the remote_ip_prefix and remote_group_id fields to None
# VPP uses all zeros to represent any Ipv4/IpV6 address
# In a neutron security group rule, you can either set the
# remote_ip_prefix or remote_group_id but not both.
# When a remote_ip_prefix value is set, the remote_group_id
# is ignored and vice versa. If both the attributes are unset,
# any remote_ip_address is permitted.
if rule['remote_ip_prefix']:
remote_ip_addr, ip_prefix_len = rule['remote_ip_prefix'
].split('/')
ip_prefix_len = int(ip_prefix_len)
# Set the required attribute, referenced by the SecurityGroupRule
# tuple, remote_group_id to None.
remote_group_id = None
elif rule['remote_group_id']:
remote_group_id = rule['remote_group_id']
# Set remote_ip_addr and ip_prefix_len to empty values
# as it is a required attribute referenced by the
# SecurityGroupRule tuple. When the remote_ip_addr value is set
# to None, the vpp-agent ignores it and looks at the
# remote-group-id. One of these attributes must be set to a valid
# value.
remote_ip_addr, ip_prefix_len = None, 0
else:
# In neutron, when both the remote_ip_prefix and remote-group-id
# are set to None, it implies permit any. But we need to set a
# valid value for the remote_ip_address attribute to tell the
# vpp-agent.
remote_ip_addr = '0.0.0.0' if not is_ipv6 else '::'
ip_prefix_len = 0
# Set the required attribute in the SecurityGroupRule tuple
# remote_group_id to None.
remote_group_id = None
# Neutron uses -1 or None or 0 to represent all ports
# VPP uses 0-65535 for all tcp/udp ports, Use -1 to represent all
# ranges for ICMP types and codes
if rule['port_range_min'] == -1 or not rule['port_range_min']:
# Valid TCP/UDP port ranges for TCP(6), UDP(17) and UDPLite(136)
if protocol in [6, 17, 136]:
port_min, port_max = (0, 65535)
# A Value of -1 represents all ICMP/ICMPv6 types & code ranges
elif protocol in [1, 58]:
port_min, port_max = (-1, -1)
# Ignore port_min and port_max fields as other protocols don't
# use them
else:
port_min, port_max = (0, 0)
else:
port_min, port_max = (rule['port_range_min'],
rule['port_range_max'])
# Handle a couple of special ICMP cases
if protocol in [1, 58]:
# All ICMP types and codes
if rule['port_range_min'] is None:
port_min, port_max = (-1, -1)
# All codes for a specific type
elif rule['port_range_max'] is None:
port_min, port_max = (rule['port_range_min'], -1)
sg_rule = SecurityGroupRule(is_ipv6, remote_ip_addr,
ip_prefix_len,
remote_group_id, protocol, port_min,
port_max)
return sg_rule
def send_secgroup_to_agents(self, session, secgroup):
"""Writes a secgroup to the etcd secgroup space
Does this via the journal as part of the commit, so
that the write is atomic with the DB commit to the
Neutron tables.
Arguments:
session -- the DB session with an open transaction
secgroup -- Named tuple representing a SecurityGroup
"""
secgroup_path = self._secgroup_path(secgroup.id)
# sg is a dict of of ingress and egress rule lists
sg = {}
ingress_rules = []
egress_rules = []
for ingress_rule in secgroup.ingress_rules:
ingress_rules.append(ingress_rule._asdict())
for egress_rule in secgroup.egress_rules:
egress_rules.append(egress_rule._asdict())
sg['ingress_rules'] = ingress_rules
sg['egress_rules'] = egress_rules
db.journal_write(session, secgroup_path, sg)
def delete_secgroup_from_etcd(self, session, secgroup_id):
"""Deletes the secgroup key from etcd
Arguments:
secgroup_id -- The id of the security group that we want to delete
"""
secgroup_path = self._secgroup_path(secgroup_id)
# Delete the security-group from remote-groups
remote_group_path = self._remote_group_path(secgroup_id, '')
db.journal_write(session, secgroup_path, None)
db.journal_write(session, remote_group_path, None)
def _secgroup_path(self, secgroup_id):
return self.secgroup_key_space + "/" + secgroup_id
def _port_path(self, host, port):
return self.port_key_space + "/" + host + "/ports/" + port['id']
# TODO(najoy): Move all security groups related code to a dedicated
# module
def _remote_group_path(self, secgroup_id, port_id):
return self.remote_group_key_space + "/" + secgroup_id + "/" + port_id
def _gpe_remote_path(self, host, port, segmentation_id):
ip_addrs = port.get('fixed_ips', [])
gpe_dir = self.gpe_key_space + "/" + str(segmentation_id) + "/" + \
host + "/" + port['mac_address']
if ip_addrs and segmentation_id:
# Delete all GPE keys and the empty GPE directory itself in etcd
return [gpe_dir + "/" + ip_address['ip_address']
for ip_address in ip_addrs] + [gpe_dir]
else:
return []
# A remote_group_path is qualified with a port ID because the agent uses
# the port ID to keep track of the dynamic set of ports associated with
# the remote_group_id. The value of this key is the
# list of IP addresses allocated to that port by neutron.
# Using the above two pieces of information, the vpp-agent
# computes the complete set of IP addresses belonging to a
# remote_group_id and uses it to expand the rule when a remote_group_id
# attribute is specified. The expansion is performed by computing a
# product using all the IP addresses of the ports in the remote_group
# and the remaining attributes of the rule.
def _remote_group_paths(self, port):
security_groups = port.get('security_groups', [])
return [self._remote_group_path(secgroup_id, port['id'])
for secgroup_id in security_groups]
######################################################################
# These functions use a DB journal to log messages before
# they're put into etcd, as that can take time and may not always
# succeed (so is a bad candidate for calling within or after a
# transaction).
def bind(self, session, port, segment, host, binding_type):
# NB segmentation_id is not optional in the wireline protocol,
# we just pass 0 for unsegmented network types
data = {
'mac_address': port['mac_address'],
'mtu': 1500, # not this, but what?: port['mtu'],
'physnet': segment[api.PHYSICAL_NETWORK],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment.get(api.SEGMENTATION_ID, 0),
'binding_type': binding_type,
'security_groups': port.get('security_groups', []),
'allowed_address_pairs': port.get('allowed_address_pairs', []),
'fixed_ips': port.get('fixed_ips', []),
'port_security_enabled': port.get('port_security_enabled', True),
# Non-essential, but useful for monitoring and debug:
'device_id': port.get('device_id', None)
}
LOG.debug("Queueing bind request for port:%s, "
"segment:%s, host:%s, type:%s",
port, data['segmentation_id'],
host, data['binding_type'])
db.journal_write(session, self._port_path(host, port), data)
# For tracking ports in a remote_group, create a journal entry in
# the remote-group key-space.
# This will result in the creation of an etcd key with the port ID
# and it's security-group-id. The value is the list of IP addresses.
# For details on how the agent thread handles the remote-group
# watch events refer to the doc under RemoteGroupWatcher in the
# server module.
for remote_group_path in self._remote_group_paths(port):
db.journal_write(session, remote_group_path,
[item['ip_address'] for item in
data['fixed_ips']])
self.kick()
def unbind(self, session, port, host, segment):
# GPE requires segmentation ID for removing its etcd keys
segmentation_id = segment.get(api.SEGMENTATION_ID, 0)
LOG.debug("Queueing unbind request for port:%s, host:%s, segment:%s",
port, host, segmentation_id)
# When a port is unbound, this journal entry will delete the
# port key (and hence it's ip address value) from etcd. The behavior
# is like removing the port IP address(es) from the
# remote-group. The agent will receive a watch notification and
# update the ACL rules to remove the IP(s) from the rule.
# Other port IP addresses associated with the remote-group-id will
# remain in the rule (as it should be).
# A hypothetical alternate implementation - If we made the remote key
# of a secgroup, a list of IPs, we could refresh the content of
# every secgroup containing this port.
# It makes the sender's work harder, and receiver's work easier
# In the sender, we need keep track of IPs during port binds, unbinds,
# security-group associations and updates. However, algorithmically
# this alternate impl. won't be better what we have now i.e. O(n).
for remote_group_path in self._remote_group_paths(port):
db.journal_write(session, remote_group_path, None)
db.journal_write(session, self._port_path(host, port), None)
# Remove all GPE remote keys from etcd, for this port
for gpe_remote_path in self._gpe_remote_path(host, port,
segmentation_id):
db.journal_write(session,
gpe_remote_path,
None)
self.kick()
def remove_port_from_remote_groups(self, session, original_port,
current_port):
"""Remove ports from remote groups when port security is updated."""
removed_sec_groups = set(original_port['security_groups']) - set(
current_port['security_groups'])
for secgroup_id in removed_sec_groups:
db.journal_write(session,
self._remote_group_path(secgroup_id,
current_port['id']),
None)
self.kick()
def make_return_worker(self):
"""The thread that manages data returned from agents via etcd."""
# TODO(ijw): agents and physnets should be checked before a bind
# is accepted
# Note that the initial load is done before spawning the background
# watcher - this means that we're prepared with the information
# to accept bind requests.
class ReturnWatcher(etcdutils.EtcdChangeWatcher):
def __init__(self, etcd_client, name, watch_path,
election_path=None, data=None):
super(ReturnWatcher, self).__init__(etcd_client,
name, watch_path,
election_path,
wait_until_elected=True,
data=data)
# Every key changes on a restart, which has the
# useful effect of resending all Nova notifications
# for 'port bound' events based on existing state.
def added(self, key, value):
# Matches a port key, gets host and uuid
m = re.match('^([^/]+)/ports/([^/]+)$', key)
if m:
host = m.group(1)
port = m.group(2)
self.data.notify_bound(port, host)
else:
# Matches an agent, gets a liveness notification
m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
key)
if m:
# TODO(ijw): this should be fed into the agents
# table.
host = m.group(1)
LOG.info('host %s is alive', host)
def removed(self, key):
# Nova doesn't much care when ports go away.
# Matches an agent, gets a liveness notification
m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
key)
if m:
# TODO(ijw): this should be fed into the agents
# table.
host = m.group(1)
LOG.info('host %s has died', host)
# Assign a UUID to each worker thread to enable thread election
return eventlet.spawn(
ReturnWatcher(self.client_factory.client(), 'return_worker',
self.state_key_space, self.election_key_space,
data=self).watch_forever)