2016-09-14 16:01:34 -07:00
|
|
|
# Copyright (c) 2016 Cisco Systems, Inc.
|
|
|
|
# All Rights Reserved.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
2020-07-24 19:34:00 -07:00
|
|
|
from abc import ABC, abstractmethod
|
2016-11-10 14:42:13 -08:00
|
|
|
from collections import namedtuple
|
2016-09-14 16:01:34 -07:00
|
|
|
import etcd
|
|
|
|
import eventlet
|
|
|
|
import eventlet.event
|
2016-12-20 21:43:13 -08:00
|
|
|
import os
|
2016-09-14 16:01:34 -07:00
|
|
|
from oslo_config import cfg
|
|
|
|
from oslo_log import log as logging
|
|
|
|
import re
|
|
|
|
import time
|
|
|
|
|
2016-09-20 12:00:11 -04:00
|
|
|
from networking_vpp import config_opts
|
2018-02-19 13:02:54 -08:00
|
|
|
from networking_vpp import constants as nvpp_const
|
2016-09-14 16:01:34 -07:00
|
|
|
from networking_vpp.db import db
|
2017-04-25 15:20:23 +10:00
|
|
|
from networking_vpp import etcdutils
|
2017-09-18 16:03:30 +02:00
|
|
|
from networking_vpp.ext_manager import ExtensionManager
|
|
|
|
from networking_vpp.extension import MechDriverExtensionBase
|
2017-02-22 11:02:38 -08:00
|
|
|
|
2020-07-24 21:39:20 -07:00
|
|
|
from neutron_lib.api.definitions import portbindings
|
|
|
|
from neutron_lib.callbacks import events
|
|
|
|
from neutron_lib.callbacks import registry
|
|
|
|
from neutron_lib.callbacks import resources
|
|
|
|
import neutron_lib.constants as n_const
|
|
|
|
from neutron_lib import context as n_context
|
|
|
|
from neutron_lib.plugins import directory
|
|
|
|
from neutron_lib.plugins.ml2 import api
|
2018-02-06 09:37:06 -08:00
|
|
|
|
2020-07-24 20:09:29 -07:00
|
|
|
import neutron.conf.agent.securitygroups_rpc
|
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
from neutron.db import provisioning_blocks
|
2016-09-29 23:41:14 -04:00
|
|
|
|
2017-01-18 12:20:22 -08:00
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class VPPMechanismDriver(api.MechanismDriver):
|
|
|
|
supported_vnic_types = [portbindings.VNIC_NORMAL]
|
2020-07-17 15:54:34 -07:00
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
MECH_NAME = 'vpp'
|
|
|
|
|
|
|
|
def initialize(self):
|
2017-09-13 10:02:47 +10:00
|
|
|
config_opts.register_vpp_opts(cfg.CONF)
|
2020-07-24 20:09:29 -07:00
|
|
|
neutron.conf.agent.securitygroups_rpc.register_securitygroups_opts(
|
|
|
|
cfg.CONF)
|
2017-09-13 10:02:47 +10:00
|
|
|
|
2020-07-17 15:54:34 -07:00
|
|
|
self.allowed_network_types = cfg.CONF.ml2_vpp.network_types
|
|
|
|
|
2017-03-29 17:25:24 +11:00
|
|
|
self.communicator = EtcdAgentCommunicator(self.port_bind_complete)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-09-18 16:03:30 +02:00
|
|
|
names = names = cfg.CONF.ml2_vpp.driver_extensions
|
2020-07-24 10:41:34 -07:00
|
|
|
if names != '':
|
2017-09-18 16:03:30 +02:00
|
|
|
self.mgr = ExtensionManager(
|
|
|
|
'networking_vpp.driver.extensions',
|
|
|
|
names,
|
|
|
|
MechDriverExtensionBase)
|
|
|
|
self.mgr.call_all('run', self.communicator)
|
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
def get_vif_type(self, port_context):
|
|
|
|
"""Determine the type of the vif to be bound from port context"""
|
|
|
|
|
|
|
|
# Default vif type
|
|
|
|
vif_type = 'vhostuser'
|
|
|
|
|
|
|
|
# We have to explicitly avoid binding agent ports - DHCP,
|
|
|
|
# L3 etc. - as vhostuser. The interface code below takes
|
|
|
|
# care of those.
|
|
|
|
|
|
|
|
owner = port_context.current['device_owner']
|
2016-12-19 12:19:47 -08:00
|
|
|
for f in n_const.DEVICE_OWNER_PREFIXES:
|
2016-09-14 16:01:34 -07:00
|
|
|
if owner.startswith(f):
|
2017-10-10 21:38:16 +11:00
|
|
|
vif_type = 'tap'
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("vif_type to be bound is: %s", vif_type)
|
2016-09-14 16:01:34 -07:00
|
|
|
return vif_type
|
|
|
|
|
|
|
|
def bind_port(self, port_context):
|
|
|
|
"""Attempt to bind a port.
|
|
|
|
|
|
|
|
:param port_context: PortContext instance describing the port
|
|
|
|
|
|
|
|
This method is called outside any transaction to attempt to
|
|
|
|
establish a port binding using this mechanism driver. Bindings
|
|
|
|
may be created at each of multiple levels of a hierarchical
|
|
|
|
network, and are established from the top level downward. At
|
|
|
|
each level, the mechanism driver determines whether it can
|
|
|
|
bind to any of the network segments in the
|
|
|
|
port_context.segments_to_bind property, based on the value of the
|
|
|
|
port_context.host property, any relevant port or network
|
|
|
|
attributes, and its own knowledge of the network topology. At
|
|
|
|
the top level, port_context.segments_to_bind contains the static
|
|
|
|
segments of the port's network. At each lower level of
|
|
|
|
binding, it contains static or dynamic segments supplied by
|
|
|
|
the driver that bound at the level above. If the driver is
|
|
|
|
able to complete the binding of the port to any segment in
|
|
|
|
port_context.segments_to_bind, it must call port_context.set_binding
|
|
|
|
with the binding details. If it can partially bind the port,
|
|
|
|
it must call port_context.continue_binding with the network
|
|
|
|
segments to be used to bind at the next lower level.
|
|
|
|
|
|
|
|
If the binding results are committed after bind_port returns,
|
|
|
|
they will be seen by all mechanism drivers as
|
|
|
|
update_port_precommit and update_port_postcommit calls. But if
|
|
|
|
some other thread or process concurrently binds or updates the
|
|
|
|
port, these binding results will not be committed, and
|
|
|
|
update_port_precommit and update_port_postcommit will not be
|
|
|
|
called on the mechanism drivers with these results. Because
|
|
|
|
binding results can be discarded rather than committed,
|
|
|
|
drivers should avoid making persistent state changes in
|
|
|
|
bind_port, or else must ensure that such state changes are
|
|
|
|
eventually cleaned up.
|
|
|
|
|
|
|
|
Implementing this method explicitly declares the mechanism
|
|
|
|
driver as having the intention to bind ports. This is inspected
|
|
|
|
by the QoS service to identify the available QoS rules you
|
|
|
|
can use with ports.
|
|
|
|
"""
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("Attempting to bind port %(port)s on "
|
2016-09-14 16:01:34 -07:00
|
|
|
"network %(network)s",
|
|
|
|
{'port': port_context.current['id'],
|
|
|
|
'network': port_context.network.current['id']})
|
|
|
|
vnic_type = port_context.current.get(portbindings.VNIC_TYPE,
|
|
|
|
portbindings.VNIC_NORMAL)
|
|
|
|
if vnic_type not in self.supported_vnic_types:
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("Refusing to bind due to unsupported "
|
2016-09-14 16:01:34 -07:00
|
|
|
"vnic_type: %s",
|
|
|
|
vnic_type)
|
|
|
|
return
|
|
|
|
|
|
|
|
for segment in port_context.segments_to_bind:
|
|
|
|
if self.check_segment(segment, port_context.host):
|
2019-03-28 14:07:37 -07:00
|
|
|
vif_details = {}
|
2016-09-14 16:01:34 -07:00
|
|
|
# TODO(ijw) should be in a library that the agent uses
|
|
|
|
vif_type = self.get_vif_type(port_context)
|
|
|
|
if vif_type == 'vhostuser':
|
|
|
|
vif_details['vhostuser_socket'] = \
|
2016-12-20 21:43:13 -08:00
|
|
|
os.path.join(cfg.CONF.ml2_vpp.vhost_user_dir,
|
|
|
|
port_context.current['id'])
|
2016-09-23 23:58:25 -07:00
|
|
|
vif_details['vhostuser_mode'] = 'server'
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug('Setting details: %s', vif_details)
|
2016-09-14 16:01:34 -07:00
|
|
|
port_context.set_binding(segment[api.ID],
|
|
|
|
vif_type,
|
|
|
|
vif_details)
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("Bind selected using segment: %s", segment)
|
2016-09-14 16:01:34 -07:00
|
|
|
return
|
|
|
|
|
|
|
|
def check_segment(self, segment, host):
|
|
|
|
"""Check if segment can be bound.
|
|
|
|
|
|
|
|
:param segment: segment dictionary describing segment to bind
|
|
|
|
:param host: host on which the segment must be bound to a port
|
|
|
|
:returns: True iff segment can be bound for host
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# TODO(ijw): naive - doesn't check host, or configured
|
|
|
|
# physnets on the host. Should work out if the binding
|
|
|
|
# can't be achieved before accepting it
|
|
|
|
|
|
|
|
network_type = segment[api.NETWORK_TYPE]
|
|
|
|
if network_type not in self.allowed_network_types:
|
|
|
|
LOG.debug(
|
2017-04-25 19:39:25 +10:00
|
|
|
'Network %(network_id)s is %(network_type)s, '
|
2016-09-14 16:01:34 -07:00
|
|
|
'but this driver only supports types '
|
|
|
|
'%(allowed_network_types)s. '
|
|
|
|
'The type must be supported if binding is to succeed.',
|
|
|
|
{'network_id': segment['id'],
|
|
|
|
'network_type': network_type,
|
|
|
|
'allowed_network_types':
|
|
|
|
', '.join(self.allowed_network_types)}
|
|
|
|
)
|
|
|
|
return False
|
|
|
|
|
2020-07-24 20:56:11 -07:00
|
|
|
if network_type in [n_const.TYPE_FLAT,
|
|
|
|
n_const.TYPE_VLAN]:
|
2016-09-14 16:01:34 -07:00
|
|
|
physnet = segment[api.PHYSICAL_NETWORK]
|
|
|
|
if not self.physnet_known(host, physnet):
|
|
|
|
LOG.debug(
|
2017-04-25 19:39:25 +10:00
|
|
|
'Network %(network_id)s is on physical '
|
2016-09-14 16:01:34 -07:00
|
|
|
'network %(physnet)s, but the physical network '
|
|
|
|
'is not one the host %(host)s has attached.',
|
|
|
|
{'network_id': segment['id'],
|
|
|
|
'physnet': physnet,
|
|
|
|
'host': host}
|
|
|
|
)
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
def physnet_known(self, host, physnet):
|
2017-03-27 20:59:50 +11:00
|
|
|
return self.communicator.find_physnet(host, physnet)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
def check_vlan_transparency(self, port_context):
|
|
|
|
"""Check if the network supports vlan transparency.
|
|
|
|
|
|
|
|
:param port_context: NetworkContext instance describing the network.
|
|
|
|
|
|
|
|
In general we do not support VLAN transparency (yet).
|
|
|
|
"""
|
|
|
|
return False
|
|
|
|
|
|
|
|
def update_port_precommit(self, port_context):
|
|
|
|
"""Work to do, during DB commit, when updating a port
|
|
|
|
|
|
|
|
If we are partially responsible for binding this port, we will
|
|
|
|
have to tell our agents they have work to do. This is an
|
|
|
|
operation within a distributed system and can therefore take
|
|
|
|
time to complete, or potentially even fail. Instead, we log
|
|
|
|
the requirement to a journal.
|
|
|
|
"""
|
|
|
|
# TODO(ijw): optimisation: the update port may leave the
|
|
|
|
# binding state the same as before if someone updated
|
|
|
|
# something other than the binding on the port, but this
|
|
|
|
# way we always send it out and it's the far end's job to
|
|
|
|
# ignore it. Doing less work is nevertheless good, so we
|
|
|
|
# should in future avoid the send.
|
|
|
|
|
2017-02-27 23:29:37 -08:00
|
|
|
# unbind port from old host, if already bound
|
|
|
|
if port_context.original_binding_levels is not None:
|
|
|
|
prev_bind = port_context.original_binding_levels[-1]
|
|
|
|
|
|
|
|
if (prev_bind is not None and
|
|
|
|
prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME and
|
|
|
|
port_context.host != port_context.original_host):
|
|
|
|
|
|
|
|
# Note that we skip this step if the change happens while
|
|
|
|
# 'unbinding' and rebinding to the same host - it's probably
|
|
|
|
# an update of extraneous detail and not really a request
|
|
|
|
# that requires binding.
|
|
|
|
|
|
|
|
self.communicator.unbind(port_context._plugin_context.session,
|
|
|
|
port_context.original,
|
2017-08-02 14:53:06 -07:00
|
|
|
port_context.original_host,
|
|
|
|
prev_bind[api.BOUND_SEGMENT]
|
|
|
|
)
|
2017-02-27 23:29:37 -08:00
|
|
|
|
|
|
|
# (Re)bind port to the new host, if it needs to be bound
|
2016-09-14 16:01:34 -07:00
|
|
|
if port_context.binding_levels is not None:
|
|
|
|
current_bind = port_context.binding_levels[-1]
|
|
|
|
|
|
|
|
if (current_bind is not None and
|
2017-02-27 23:29:37 -08:00
|
|
|
current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
|
|
|
|
|
|
|
|
binding_type = self.get_vif_type(port_context)
|
2017-05-30 12:56:32 -07:00
|
|
|
# Remove port membership from any previously associated
|
|
|
|
# security groups for updating remote_security_group_id ACLs
|
2018-06-15 16:49:04 -07:00
|
|
|
self.communicator.remove_port_from_remote_groups(
|
2017-05-30 12:56:32 -07:00
|
|
|
port_context._plugin_context.session,
|
|
|
|
port_context.original,
|
|
|
|
port_context.current)
|
2017-02-27 23:29:37 -08:00
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
self.communicator.bind(port_context._plugin_context.session,
|
|
|
|
port_context.current,
|
|
|
|
current_bind[api.BOUND_SEGMENT],
|
|
|
|
port_context.host,
|
|
|
|
binding_type)
|
|
|
|
|
2017-03-29 17:25:24 +11:00
|
|
|
# TODO(ijW): The agent driver checks for a change of
|
|
|
|
# host, but we're oddly seeing that the orig_host is
|
|
|
|
# always set. Should confirm if this is a problem or
|
|
|
|
# not.
|
|
|
|
self._insert_provisioning_block(port_context)
|
|
|
|
|
2019-08-09 17:34:01 +05:30
|
|
|
# If this is part of a successful live migration where the
|
|
|
|
# target compute's nova has invoked update_port() binding the
|
|
|
|
# port to the target compute and removing the 'migrating_to'
|
|
|
|
# key from the binding:profile attribute then this is our
|
|
|
|
# chance to remove etcd entries of the old port binding
|
|
|
|
if (port_context.original is not None and
|
|
|
|
port_context.original_host is not None and
|
|
|
|
port_context.original_host != port_context.host and
|
|
|
|
'migrating_to' not in
|
|
|
|
port_context.current['binding:profile'] and
|
|
|
|
'migrating_to' in
|
|
|
|
port_context.original['binding:profile'] and
|
|
|
|
port_context.original['binding:profile']['migrating_to']
|
|
|
|
== port_context.host):
|
2020-07-24 10:41:34 -07:00
|
|
|
self.communicator.unbind(
|
|
|
|
port_context._plugin_context.session,
|
|
|
|
port_context.original,
|
|
|
|
port_context.original_host,
|
|
|
|
current_bind[api.BOUND_SEGMENT]
|
|
|
|
)
|
2019-08-09 17:34:01 +05:30
|
|
|
|
2017-03-29 17:25:24 +11:00
|
|
|
def port_bind_complete(self, port_id, host):
|
|
|
|
"""Tell things that the port is truly bound.
|
|
|
|
|
|
|
|
This is a callback called by the etcd communicator.
|
|
|
|
|
|
|
|
"""
|
|
|
|
LOG.debug('bind complete on %s', port_id)
|
|
|
|
self._release_provisioning_block(host, port_id)
|
|
|
|
|
|
|
|
def _insert_provisioning_block(self, context):
|
|
|
|
|
|
|
|
# we insert a status barrier to prevent the port from transitioning
|
|
|
|
# to active until the agent reports back that the wiring is done
|
|
|
|
port = context.current
|
|
|
|
if port['status'] == n_const.PORT_STATUS_ACTIVE:
|
|
|
|
# no point in putting in a block if the status is already ACTIVE
|
|
|
|
return
|
|
|
|
|
|
|
|
provisioning_blocks.add_provisioning_component(
|
|
|
|
context._plugin_context, port['id'], resources.PORT,
|
|
|
|
provisioning_blocks.L2_AGENT_ENTITY)
|
|
|
|
|
|
|
|
def _release_provisioning_block(self, host, port_id):
|
|
|
|
context = n_context.get_admin_context()
|
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
provisioning_blocks.provisioning_complete(
|
|
|
|
context, port_id, resources.PORT,
|
|
|
|
provisioning_blocks.L2_AGENT_ENTITY)
|
2017-03-29 17:25:24 +11:00
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
def update_port_postcommit(self, port_context):
|
|
|
|
"""Work to do, post-DB commit, when updating a port
|
|
|
|
|
|
|
|
After accepting an update_port, determine if we logged any
|
|
|
|
work to do on the networking devices, and - if so - kick the
|
|
|
|
update thread that tells our minions.
|
|
|
|
|
|
|
|
"""
|
|
|
|
# TODO(ijw): optimisation: the update port may leave the
|
|
|
|
# binding state the same as before if someone updated
|
|
|
|
# something other than the binding on the port, but this
|
|
|
|
# way we always send it out and it's the far end's job to
|
|
|
|
# ignore it. Doing less work is nevertheless good, so we
|
|
|
|
# should in future avoid the send.
|
|
|
|
|
|
|
|
if port_context.binding_levels is not None:
|
|
|
|
current_bind = port_context.binding_levels[-1]
|
|
|
|
if port_context.original_binding_levels is None:
|
|
|
|
prev_bind = None
|
|
|
|
else:
|
|
|
|
prev_bind = port_context.original_binding_levels[-1]
|
|
|
|
|
|
|
|
if (current_bind is not None and
|
|
|
|
current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
|
|
|
|
self.communicator.kick()
|
|
|
|
elif (prev_bind is not None and
|
|
|
|
prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
|
|
|
|
self.communicator.kick()
|
|
|
|
|
|
|
|
def delete_port_precommit(self, port_context):
|
|
|
|
port = port_context.current
|
|
|
|
host = port_context.host
|
2017-02-27 23:29:37 -08:00
|
|
|
# NB: Host is typically '' if the port is not bound
|
2017-10-02 13:56:34 -07:00
|
|
|
# A port can be in an invalid state with a host context
|
|
|
|
if host and port_context.binding_levels:
|
2017-08-02 14:53:06 -07:00
|
|
|
segment = port_context.binding_levels[-1][api.BOUND_SEGMENT]
|
2017-02-27 23:29:37 -08:00
|
|
|
self.communicator.unbind(port_context._plugin_context.session,
|
2017-08-02 14:53:06 -07:00
|
|
|
port, host, segment)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
def delete_port_postcommit(self, port_context):
|
|
|
|
self.communicator.kick()
|
|
|
|
|
|
|
|
|
2020-07-24 19:34:00 -07:00
|
|
|
class AgentCommunicator(ABC):
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def bind(self, port, segment, host, binding_type):
|
|
|
|
pass
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def unbind(self, port, host):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# Our prefix for etcd keys, in case others are using etcd.
|
2018-02-19 13:02:54 -08:00
|
|
|
LEADIN = nvpp_const.LEADIN # TODO(ijw): make configurable?
|
2016-11-10 14:42:13 -08:00
|
|
|
# Model for representing a security group
|
|
|
|
SecurityGroup = namedtuple(
|
|
|
|
'SecurityGroup', ['id', 'ingress_rules', 'egress_rules']
|
|
|
|
)
|
|
|
|
# Model for a VPP security group rule
|
|
|
|
SecurityGroupRule = namedtuple(
|
|
|
|
'SecurityGroupRule', ['is_ipv6', 'remote_ip_addr',
|
2017-05-30 12:56:32 -07:00
|
|
|
'ip_prefix_len', 'remote_group_id',
|
|
|
|
'protocol', 'port_min', 'port_max']
|
2016-11-10 14:42:13 -08:00
|
|
|
)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
|
2019-09-26 10:45:27 -07:00
|
|
|
class JournalManager(object):
|
|
|
|
"""Perform journal management tasks.
|
|
|
|
|
|
|
|
Handles the post-journal part of the work that clears out the table and
|
|
|
|
updates etcd. Also kicks the forward worker threads that is active and
|
|
|
|
tells it that it has work to do.
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
|
|
|
|
self.client_factory = etcdutils.EtcdClientFactory(cfg.CONF.ml2_vpp)
|
|
|
|
|
|
|
|
self.election_key_space = LEADIN + '/election'
|
|
|
|
self.journal_kick_key = self.election_key_space + '/kick-journal'
|
|
|
|
LOG.debug('Journal manager init complete')
|
|
|
|
|
|
|
|
def kick(self):
|
|
|
|
# A thread in one Neutron process - possibly not this one -
|
|
|
|
# is waiting to send updates from the DB to etcd. Wake it.
|
|
|
|
try:
|
|
|
|
# TODO(ijw): got to be a more efficient way to create
|
|
|
|
# a client for each thread
|
|
|
|
# From a Neutron thread, we need to tell whichever of the
|
|
|
|
# forwarder threads that is active that it has work to do.
|
|
|
|
self.client_factory.client().write(self.journal_kick_key, '')
|
|
|
|
except etcd.EtcdException:
|
|
|
|
# A failed wake is not the end of the world.
|
|
|
|
pass
|
|
|
|
|
|
|
|
######################################################################
|
|
|
|
# The post-journal part of the work that clears out the table and
|
|
|
|
# updates etcd.
|
|
|
|
|
|
|
|
def make_forward_worker(self):
|
|
|
|
# Assign a UUID to each worker thread to enable thread election
|
|
|
|
return eventlet.spawn(self._forward_worker)
|
|
|
|
|
|
|
|
def do_etcd_update(self, etcd_writer, k, v):
|
|
|
|
with eventlet.Timeout(cfg.CONF.ml2_vpp.etcd_write_time, False):
|
|
|
|
if v is None:
|
|
|
|
etcd_writer.delete(k)
|
|
|
|
else:
|
|
|
|
etcd_writer.write(k, v)
|
|
|
|
|
|
|
|
def _forward_worker(self):
|
|
|
|
LOG.debug('forward worker begun')
|
|
|
|
etcd_client = self.client_factory.client()
|
|
|
|
etcd_writer = etcdutils.json_writer(etcd_client)
|
|
|
|
lease_time = cfg.CONF.ml2_vpp.forward_worker_master_lease_time
|
|
|
|
recovery_time = cfg.CONF.ml2_vpp.forward_worker_recovery_time
|
|
|
|
|
|
|
|
etcd_election = etcdutils.EtcdElection(etcd_client, 'forward_worker',
|
|
|
|
self.election_key_space,
|
|
|
|
work_time=lease_time,
|
|
|
|
recovery_time=recovery_time)
|
|
|
|
while True:
|
|
|
|
# Try indefinitely to regain the mastery of this thread pool. Most
|
|
|
|
# threads will be sitting here
|
|
|
|
etcd_election.wait_until_elected()
|
|
|
|
try:
|
|
|
|
# Master loop - as long as we are master and can
|
|
|
|
# maintain it, process incoming events.
|
|
|
|
|
|
|
|
# Every long running section is preceded by extending
|
|
|
|
# mastership of the thread pool and followed by
|
|
|
|
# confirmation that we still have mastership (usually
|
|
|
|
# by a further extension).
|
|
|
|
|
|
|
|
def work(k, v):
|
|
|
|
self.do_etcd_update(etcd_writer, k, v)
|
|
|
|
|
|
|
|
# We will try to empty the pending rows in the DB
|
|
|
|
while True:
|
|
|
|
etcd_election.extend_election(
|
|
|
|
cfg.CONF.ml2_vpp.db_query_time)
|
|
|
|
session = n_context.get_admin_context().session
|
|
|
|
maybe_more = db.journal_read(session, work)
|
|
|
|
if not maybe_more:
|
|
|
|
LOG.debug('forward worker has emptied journal')
|
|
|
|
etcd_election.extend_election(lease_time)
|
|
|
|
break
|
|
|
|
|
|
|
|
# work queue is now empty.
|
|
|
|
|
|
|
|
# Wait to be kicked, or (in case of emergency) run
|
|
|
|
# every few seconds in case another thread or process
|
|
|
|
# dumped work and failed to get notification to us to
|
|
|
|
# process it.
|
|
|
|
with eventlet.Timeout(lease_time + 1, False):
|
|
|
|
etcd_election.extend_election(lease_time)
|
|
|
|
try:
|
|
|
|
etcd_client.watch(self.journal_kick_key,
|
|
|
|
timeout=lease_time)
|
|
|
|
except etcd.EtcdException:
|
|
|
|
# Check the DB queue now, anyway
|
|
|
|
pass
|
|
|
|
except etcdutils.EtcdElectionLost:
|
|
|
|
# We are no longer master
|
|
|
|
pass
|
|
|
|
except Exception as e:
|
|
|
|
# TODO(ijw): log exception properly
|
|
|
|
LOG.warning("problems in forward worker - Error name is %s. "
|
|
|
|
"proceeding without quiting", type(e).__name__)
|
|
|
|
LOG.warning("Exception in forward_worker: %s", e)
|
|
|
|
# something went bad; breathe, in case we end
|
|
|
|
# up in a tight loop
|
|
|
|
time.sleep(1)
|
|
|
|
# never quit
|
|
|
|
pass
|
|
|
|
|
|
|
|
######################################################################
|
|
|
|
|
|
|
|
|
|
|
|
class EtcdAgentCommunicator(AgentCommunicator, JournalManager):
|
2016-09-14 16:01:34 -07:00
|
|
|
"""Comms unit for etcd
|
|
|
|
|
|
|
|
This will talk to etcd and tell it what is going on
|
|
|
|
with the Neutron database. etcd can be run as a
|
|
|
|
cluster and so shouldn't die, but can be unreachable,
|
|
|
|
so this class's job is to ensure that all updates are
|
|
|
|
forwarded to etcd in order even when things are not
|
|
|
|
quite going as planned.
|
|
|
|
|
|
|
|
In etcd, the layout is:
|
2017-05-30 12:56:32 -07:00
|
|
|
# Port Space
|
2016-09-14 16:01:34 -07:00
|
|
|
LEADIN/nodes - subdirs are compute nodes
|
|
|
|
LEADIN/nodes/X/ports - entries are JSON-ness containing
|
|
|
|
all information on each bound port on the compute node.
|
|
|
|
(Unbound ports are homeless, so the act of unbinding is
|
|
|
|
the deletion of this entry.)
|
2017-05-30 12:56:32 -07:00
|
|
|
# Global Space
|
|
|
|
LEADIN/global/secgroups/<security-group-id> - entries are security-group
|
|
|
|
keys whose values contain all the rule data
|
|
|
|
LEADIN/global/networks/gpe/<vni>/<hostname>/<mac>/<ip-address> - Entries
|
|
|
|
contain GPE data such as the VNI, hostname, instance's mac & IP address
|
|
|
|
and key value is the underlay IP address of the compute node
|
|
|
|
LEADIN/global/remote_group/<group-id>/<port-id> contain the IP addresses
|
|
|
|
of ports in a security-group
|
|
|
|
# State Space
|
2017-03-27 20:59:50 +11:00
|
|
|
LEADIN/state/X - return state of the VPP
|
|
|
|
LEADIN/state/X/alive - heartbeat back
|
|
|
|
LEADIN/state/X/ports - port information.
|
|
|
|
LEADIN/state/X/physnets - physnets on node
|
2016-09-14 16:01:34 -07:00
|
|
|
Specifically a key here (regardless of value) indicates
|
|
|
|
the port has been bound and is receiving traffic.
|
|
|
|
"""
|
|
|
|
|
2017-03-29 17:25:24 +11:00
|
|
|
def __init__(self, notify_bound):
|
2016-09-14 16:01:34 -07:00
|
|
|
super(EtcdAgentCommunicator, self).__init__()
|
2017-04-25 20:29:34 +10:00
|
|
|
LOG.debug("Using etcd host:%s port:%s user:%s",
|
|
|
|
cfg.CONF.ml2_vpp.etcd_host,
|
|
|
|
cfg.CONF.ml2_vpp.etcd_port,
|
|
|
|
cfg.CONF.ml2_vpp.etcd_user)
|
2016-12-08 12:09:39 -08:00
|
|
|
|
2017-03-29 17:25:24 +11:00
|
|
|
# This is a function that is called when a port has been
|
|
|
|
# notified from the agent via etcd as completely attached.
|
|
|
|
|
|
|
|
# We call this when we're certain that the VPP on the far end
|
|
|
|
# has definitely bound the port, and has dropped a vhost-user
|
|
|
|
# socket where it can be found.
|
|
|
|
|
|
|
|
# This is more important than it seems, becaus libvirt will
|
|
|
|
# hang, because qemu ignores its monitor port, when qemu is
|
|
|
|
# waiting for a partner to connect with on its vhost-user
|
|
|
|
# interfaces. It can't start the VM - that requires
|
|
|
|
# information from its partner it can't guess at - but it
|
|
|
|
# shouldn't hang the monitor - nevertheless... So we notify
|
|
|
|
# when the port is there and ready, and qemu is never put into
|
|
|
|
# this state by Nova.
|
|
|
|
self.notify_bound = notify_bound
|
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
# We need certain directories to exist
|
2016-09-26 10:46:50 -07:00
|
|
|
self.state_key_space = LEADIN + '/state'
|
|
|
|
self.port_key_space = LEADIN + '/nodes'
|
2016-11-10 14:42:13 -08:00
|
|
|
self.secgroup_key_space = LEADIN + '/global/secgroups'
|
2017-05-30 12:56:32 -07:00
|
|
|
self.remote_group_key_space = LEADIN + '/global/remote_group'
|
2017-08-02 14:53:06 -07:00
|
|
|
self.gpe_key_space = LEADIN + '/global/networks/gpe'
|
2017-03-15 00:16:44 +11:00
|
|
|
|
|
|
|
etcd_client = self.client_factory.client()
|
2017-04-25 15:20:23 +10:00
|
|
|
etcd_helper = etcdutils.EtcdHelper(etcd_client)
|
2017-03-15 00:16:44 +11:00
|
|
|
etcd_helper.ensure_dir(self.state_key_space)
|
|
|
|
etcd_helper.ensure_dir(self.port_key_space)
|
|
|
|
etcd_helper.ensure_dir(self.secgroup_key_space)
|
|
|
|
etcd_helper.ensure_dir(self.election_key_space)
|
2017-05-30 12:56:32 -07:00
|
|
|
etcd_helper.ensure_dir(self.remote_group_key_space)
|
2017-03-15 00:16:44 +11:00
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
self.secgroup_enabled = cfg.CONF.SECURITYGROUP.enable_security_group
|
|
|
|
if self.secgroup_enabled:
|
|
|
|
self.register_secgroup_event_handler()
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
# TODO(ijw): .../state/<host> lists all known hosts, and they
|
|
|
|
# heartbeat when they're functioning
|
|
|
|
|
2017-03-15 00:16:44 +11:00
|
|
|
# From this point on, there are multiple threads: ensure that
|
|
|
|
# we don't re-use the etcd_client from multiple threads
|
|
|
|
# simultaneously
|
|
|
|
etcd_helper = None
|
|
|
|
etcd_client = None
|
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
registry.subscribe(self.start_threads,
|
|
|
|
resources.PROCESS,
|
2020-07-30 16:12:38 -07:00
|
|
|
events.AFTER_SPAWN)
|
2016-11-11 17:05:08 -08:00
|
|
|
|
2020-07-30 16:12:38 -07:00
|
|
|
def start_threads(self, resource, event, trigger, payload=None):
|
2016-11-11 17:05:08 -08:00
|
|
|
LOG.debug('Starting background threads for Neutron worker')
|
2017-02-16 16:15:29 -08:00
|
|
|
self.return_thread = self.make_return_worker()
|
|
|
|
self.forward_thread = self.make_forward_worker()
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-03-27 20:59:50 +11:00
|
|
|
def find_physnet(self, host, physnet):
|
|
|
|
"""Identify if an agent can connect to the physical network
|
|
|
|
|
|
|
|
This can fail if the agent hasn't been configured for that
|
|
|
|
physnet, or if the agent isn't alive.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# TODO(ijw): we use an on-the-fly created client so that we
|
|
|
|
# don't have threading problems from the caller.
|
|
|
|
try:
|
|
|
|
etcd_client = self.client_factory.client()
|
2017-05-20 05:44:27 +02:00
|
|
|
|
2017-03-27 20:59:50 +11:00
|
|
|
etcd_client.read('%s/state/%s/alive' % (LEADIN, host))
|
|
|
|
etcd_client.read('%s/state/%s/physnets/%s' %
|
|
|
|
(LEADIN, host, physnet))
|
|
|
|
except etcd.EtcdKeyNotFound:
|
|
|
|
return False
|
|
|
|
return True
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
def register_secgroup_event_handler(self):
|
2017-01-03 20:38:53 -08:00
|
|
|
"""Subscribe a handler to process secgroup change notifications
|
|
|
|
|
|
|
|
We're interested in PRECOMMIT_xxx (where we store the changes
|
|
|
|
to etcd in the journal table) and AFTER_xxx (where we
|
|
|
|
remind the worker thread it may have work to do).
|
|
|
|
"""
|
|
|
|
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.info("Security groups feature is enabled")
|
2017-01-18 12:20:22 -08:00
|
|
|
|
2017-02-28 09:57:58 -08:00
|
|
|
# NB security group rules cannot be updated, and security
|
|
|
|
# groups themselves have no forwarder state in them, so we
|
|
|
|
# don't need the update events
|
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
# register pre-commit events
|
|
|
|
# security group precommit events
|
|
|
|
registry.subscribe(self.process_secgroup_commit,
|
|
|
|
resources.SECURITY_GROUP,
|
|
|
|
events.PRECOMMIT_CREATE)
|
|
|
|
registry.subscribe(self.process_secgroup_commit,
|
|
|
|
resources.SECURITY_GROUP,
|
|
|
|
events.PRECOMMIT_DELETE)
|
|
|
|
# security group rule precommit events
|
|
|
|
registry.subscribe(self.process_secgroup_commit,
|
|
|
|
resources.SECURITY_GROUP_RULE,
|
|
|
|
events.PRECOMMIT_CREATE)
|
|
|
|
registry.subscribe(self.process_secgroup_commit,
|
|
|
|
resources.SECURITY_GROUP_RULE,
|
|
|
|
events.PRECOMMIT_DELETE)
|
2017-02-28 09:57:58 -08:00
|
|
|
|
|
|
|
# register post-commit events
|
|
|
|
# security group post commit events
|
|
|
|
registry.subscribe(self.process_secgroup_after,
|
|
|
|
resources.SECURITY_GROUP,
|
|
|
|
events.AFTER_CREATE)
|
|
|
|
registry.subscribe(self.process_secgroup_after,
|
|
|
|
resources.SECURITY_GROUP,
|
|
|
|
events.AFTER_DELETE)
|
|
|
|
# security group rule post commit events
|
|
|
|
registry.subscribe(self.process_secgroup_after,
|
|
|
|
resources.SECURITY_GROUP_RULE,
|
|
|
|
events.AFTER_CREATE)
|
2017-01-18 12:20:22 -08:00
|
|
|
registry.subscribe(self.process_secgroup_after,
|
2016-11-10 14:42:13 -08:00
|
|
|
resources.SECURITY_GROUP_RULE,
|
|
|
|
events.AFTER_DELETE)
|
|
|
|
|
2017-01-18 12:20:22 -08:00
|
|
|
def process_secgroup_after(self, resource, event, trigger, **kwargs):
|
2017-02-23 20:30:04 -08:00
|
|
|
"""Callback for handling security group/rule commit-complete events
|
2017-01-18 12:20:22 -08:00
|
|
|
|
|
|
|
This is when we should tell other things that a change has
|
|
|
|
happened and has been recorded permanently in the DB.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Whatever the object that caused this, we've put something
|
|
|
|
# in the journal and now need to nudge the communicator
|
|
|
|
self.kick()
|
|
|
|
|
|
|
|
def process_secgroup_commit(self, resource, event, trigger, **kwargs):
|
2017-02-23 20:30:04 -08:00
|
|
|
"""Callback for handling security group/rule commit events
|
2017-01-18 12:20:22 -08:00
|
|
|
|
|
|
|
This is the time at which we should be committing any of our
|
|
|
|
own auxiliary changes to the DB.
|
|
|
|
"""
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("Received event %s notification for resource"
|
2017-04-25 20:29:34 +10:00
|
|
|
" %s with kwargs %s", event, resource, kwargs)
|
2017-01-19 18:21:23 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
context = kwargs['context']
|
2017-01-19 18:21:23 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
sgid = None
|
|
|
|
deleted_rule_id = None
|
|
|
|
added_rule = {}
|
2017-01-19 18:21:23 -08:00
|
|
|
|
2017-01-18 12:20:22 -08:00
|
|
|
if resource == resources.SECURITY_GROUP:
|
2020-04-28 15:11:40 -07:00
|
|
|
if event == events.PRECOMMIT_DELETE:
|
2017-03-15 00:05:56 +11:00
|
|
|
self.delete_secgroup_from_etcd(context.session,
|
|
|
|
kwargs['security_group_id'])
|
2020-04-28 23:50:25 -07:00
|
|
|
# Job done.
|
|
|
|
return
|
2017-01-19 18:21:23 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# All other event types drop out to the update-the-key
|
|
|
|
# code at the bottom of the function.
|
2017-01-19 18:21:23 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
elif event == events.PRECOMMIT_CREATE:
|
|
|
|
# Also, the SG passed to us is what comes in from the user. We
|
|
|
|
# require what went into the DB (where we added a UUID to it),
|
|
|
|
# so we have to get that from the record of changed and as yet
|
|
|
|
# uncommitted objects.
|
|
|
|
sgid = kwargs[resource]['id']
|
|
|
|
|
|
|
|
# The default rules are supplied directly in the context
|
|
|
|
rules = kwargs[resource]['security_group_rules']
|
2017-01-03 20:38:53 -08:00
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
elif resource == resources.SECURITY_GROUP_RULE:
|
2017-01-03 20:38:53 -08:00
|
|
|
# We store security groups with a composite of all their
|
|
|
|
# rules. So in this case we track down the affected
|
|
|
|
# rule and update its entire data.
|
2017-01-18 12:20:22 -08:00
|
|
|
# NB: rules are never updated.
|
2017-02-23 20:30:04 -08:00
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
if event == events.PRECOMMIT_DELETE:
|
2020-04-28 23:50:25 -07:00
|
|
|
deleted_rule_id = kwargs['security_group_rule_id']
|
|
|
|
sgid = kwargs['security_group_id']
|
2017-01-19 18:28:31 -08:00
|
|
|
|
2020-04-28 15:11:40 -07:00
|
|
|
elif event == events.PRECOMMIT_CREATE:
|
2017-01-19 18:21:23 -08:00
|
|
|
# Groups don't have the same UUID problem - we're not
|
|
|
|
# using their UUID, we're using their SG's, which must
|
|
|
|
# be present.
|
2020-04-28 23:50:25 -07:00
|
|
|
added_rule = kwargs['security_group_rule']
|
|
|
|
sgid = added_rule['security_group_id']
|
2017-01-03 20:38:53 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# Start by getting the rules the DB knows about
|
|
|
|
rules = self.get_rules_for_secgroup(sgid, context)
|
2016-11-10 14:42:13 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# Adding a group or changing rules results in the same behaviour:
|
|
|
|
# rewrite the key in etcd.
|
2016-11-10 14:42:13 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# If we're in the precommit part, we may have deleted rules in this
|
|
|
|
# list and we should exclude them. We also, cautiously, exclude any
|
|
|
|
# added rule as well, though it should not have made it into the DB.
|
|
|
|
# If either is unset it's None, and != None is not a test that will
|
|
|
|
# pass.
|
|
|
|
rules = [r for r in rules
|
|
|
|
if r['id'] != deleted_rule_id
|
|
|
|
and r['id'] != added_rule.get('id')]
|
2016-11-10 14:42:13 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
if added_rule:
|
|
|
|
rules.append(added_rule)
|
2017-02-22 11:02:38 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# Get the full details of the secgroup in etcd-exchange format
|
|
|
|
secgroup = self.get_secgroup_from_rules(sgid, rules)
|
2017-02-22 11:02:38 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
# Write security group data to etcd
|
|
|
|
self.send_secgroup_to_agents(context.session, secgroup)
|
2017-02-23 20:30:04 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
def get_rules_for_secgroup(self, sgid, context):
|
|
|
|
plugin = directory.get_plugin()
|
|
|
|
rules = plugin.get_security_group_rules(
|
|
|
|
context, filters={'security_group_id': [sgid]}
|
|
|
|
)
|
2017-02-23 20:30:04 -08:00
|
|
|
|
2020-04-28 23:50:25 -07:00
|
|
|
return rules
|
2016-11-10 14:42:13 -08:00
|
|
|
|
|
|
|
def get_secgroup_rule(self, rule_id, context):
|
|
|
|
"""Fetch and return a security group rule from Neutron DB"""
|
2017-01-03 20:38:53 -08:00
|
|
|
plugin = directory.get_plugin()
|
2020-04-28 23:50:25 -07:00
|
|
|
return plugin.get_security_group_rule(context, rule_id)
|
2016-11-10 14:42:13 -08:00
|
|
|
|
2017-02-23 20:30:04 -08:00
|
|
|
def get_secgroup_from_rules(self, sgid, rules):
|
2016-11-10 14:42:13 -08:00
|
|
|
"""Build and return a security group namedtuple object.
|
|
|
|
|
2017-01-03 20:38:53 -08:00
|
|
|
This object is the format with which we exchange data with
|
|
|
|
the agents, and can be written in this form to etcd.
|
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
Arguments:
|
|
|
|
sgid - ID of the security group
|
2017-01-03 20:38:53 -08:00
|
|
|
rules - A list of security group rules as returned from the DB
|
2016-11-10 14:42:13 -08:00
|
|
|
|
|
|
|
1. Filter rules using the input param: sgid to ensure that rules
|
|
|
|
belong to that group
|
|
|
|
2. Split rules based on direction.
|
|
|
|
3. Construct and return the SecurityGroup namedtuple.
|
|
|
|
"""
|
|
|
|
# A generator object of security group rules for sgid
|
|
|
|
sg_rules = (r for r in rules if r['security_group_id'] == sgid)
|
2017-02-23 20:30:04 -08:00
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
# A list of ingress and egress namedtuple rule objects
|
|
|
|
ingress_rules = []
|
|
|
|
egress_rules = []
|
|
|
|
for r in sg_rules:
|
|
|
|
if r['direction'] == 'ingress':
|
|
|
|
ingress_rules.append(self._neutron_rule_to_vpp_acl(r))
|
|
|
|
else:
|
|
|
|
egress_rules.append(self._neutron_rule_to_vpp_acl(r))
|
|
|
|
return SecurityGroup(sgid, ingress_rules, egress_rules)
|
|
|
|
|
2017-12-12 11:20:37 +00:00
|
|
|
# Neutron supports the following IP protocols by name
|
|
|
|
protocols = {'tcp': 6, 'udp': 17, 'icmp': 1, 'icmpv6': 58,
|
|
|
|
'ah': 51, 'dccp': 33, 'egp': 8, 'esp': 50, 'gre': 47,
|
|
|
|
'igmp': 2, 'ipv6-encap': 41, 'ipv6-frag': 44,
|
|
|
|
'ipv6-icmp': 58, 'ipv6-nonxt': 59, 'ipv6-opts': 60,
|
|
|
|
'ipv6-route': 43, 'ospf': 89, 'pgm': 113, 'rsvp': 46,
|
|
|
|
'sctp': 132, 'udplite': 136,
|
|
|
|
'vrrp': 112}
|
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
def _neutron_rule_to_vpp_acl(self, rule):
|
|
|
|
"""Convert a neutron rule to vpp_acl rule.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
1. rule -- represents a neutron rule
|
|
|
|
|
|
|
|
- Convert the neutron rule to a vpp_acl rule model
|
|
|
|
- Return the SecurityGroupRule namedtuple.
|
|
|
|
"""
|
|
|
|
is_ipv6 = 0 if rule['ethertype'] == 'IPv4' else 1
|
2017-12-12 11:20:37 +00:00
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
if rule['protocol'] is None:
|
2017-12-12 11:20:37 +00:00
|
|
|
# Neutron uses None to represent any protocol
|
|
|
|
# We use 0 to represent any protocol
|
2016-11-10 14:42:13 -08:00
|
|
|
protocol = 0
|
2017-12-12 11:20:37 +00:00
|
|
|
elif rule['protocol'] in self.protocols:
|
|
|
|
# VPP rules require IANA protocol numbers
|
|
|
|
# Convert input accordingly.
|
|
|
|
protocol = self.protocols[rule['protocol']]
|
2016-11-10 14:42:13 -08:00
|
|
|
else:
|
2017-12-12 11:20:37 +00:00
|
|
|
# Convert incoming string value to an integer
|
2017-12-08 10:26:48 -08:00
|
|
|
protocol = int(rule['protocol'])
|
2017-12-12 11:20:37 +00:00
|
|
|
|
|
|
|
if is_ipv6 and protocol == self.protocols['icmp']:
|
|
|
|
protocol = self.protocols['icmpv6']
|
|
|
|
|
2017-05-30 12:56:32 -07:00
|
|
|
# Neutron represents any ip address by setting
|
|
|
|
# both the remote_ip_prefix and remote_group_id fields to None
|
2016-11-10 14:42:13 -08:00
|
|
|
# VPP uses all zeros to represent any Ipv4/IpV6 address
|
2017-05-30 12:56:32 -07:00
|
|
|
# In a neutron security group rule, you can either set the
|
|
|
|
# remote_ip_prefix or remote_group_id but not both.
|
|
|
|
# When a remote_ip_prefix value is set, the remote_group_id
|
|
|
|
# is ignored and vice versa. If both the attributes are unset,
|
|
|
|
# any remote_ip_address is permitted.
|
|
|
|
if rule['remote_ip_prefix']:
|
2016-11-10 14:42:13 -08:00
|
|
|
remote_ip_addr, ip_prefix_len = rule['remote_ip_prefix'
|
|
|
|
].split('/')
|
2017-05-30 12:56:32 -07:00
|
|
|
ip_prefix_len = int(ip_prefix_len)
|
|
|
|
# Set the required attribute, referenced by the SecurityGroupRule
|
|
|
|
# tuple, remote_group_id to None.
|
|
|
|
remote_group_id = None
|
|
|
|
elif rule['remote_group_id']:
|
|
|
|
remote_group_id = rule['remote_group_id']
|
|
|
|
# Set remote_ip_addr and ip_prefix_len to empty values
|
|
|
|
# as it is a required attribute referenced by the
|
|
|
|
# SecurityGroupRule tuple. When the remote_ip_addr value is set
|
|
|
|
# to None, the vpp-agent ignores it and looks at the
|
|
|
|
# remote-group-id. One of these attributes must be set to a valid
|
|
|
|
# value.
|
|
|
|
remote_ip_addr, ip_prefix_len = None, 0
|
|
|
|
else:
|
|
|
|
# In neutron, when both the remote_ip_prefix and remote-group-id
|
|
|
|
# are set to None, it implies permit any. But we need to set a
|
|
|
|
# valid value for the remote_ip_address attribute to tell the
|
|
|
|
# vpp-agent.
|
|
|
|
remote_ip_addr = '0.0.0.0' if not is_ipv6 else '::'
|
|
|
|
ip_prefix_len = 0
|
|
|
|
# Set the required attribute in the SecurityGroupRule tuple
|
|
|
|
# remote_group_id to None.
|
|
|
|
remote_group_id = None
|
2017-12-08 10:26:48 -08:00
|
|
|
# Neutron uses -1 or None or 0 to represent all ports
|
2016-11-10 14:42:13 -08:00
|
|
|
# VPP uses 0-65535 for all tcp/udp ports, Use -1 to represent all
|
|
|
|
# ranges for ICMP types and codes
|
2017-12-08 10:26:48 -08:00
|
|
|
if rule['port_range_min'] == -1 or not rule['port_range_min']:
|
2017-12-12 11:20:37 +00:00
|
|
|
# Valid TCP/UDP port ranges for TCP(6), UDP(17) and UDPLite(136)
|
|
|
|
if protocol in [6, 17, 136]:
|
2016-11-10 14:42:13 -08:00
|
|
|
port_min, port_max = (0, 65535)
|
|
|
|
# A Value of -1 represents all ICMP/ICMPv6 types & code ranges
|
|
|
|
elif protocol in [1, 58]:
|
|
|
|
port_min, port_max = (-1, -1)
|
2017-12-12 11:20:37 +00:00
|
|
|
# Ignore port_min and port_max fields as other protocols don't
|
|
|
|
# use them
|
2016-11-10 14:42:13 -08:00
|
|
|
else:
|
|
|
|
port_min, port_max = (0, 0)
|
|
|
|
else:
|
|
|
|
port_min, port_max = (rule['port_range_min'],
|
|
|
|
rule['port_range_max'])
|
2018-06-08 13:36:00 -07:00
|
|
|
|
|
|
|
# Handle a couple of special ICMP cases
|
|
|
|
if protocol in [1, 58]:
|
|
|
|
# All ICMP types and codes
|
|
|
|
if rule['port_range_min'] is None:
|
|
|
|
port_min, port_max = (-1, -1)
|
|
|
|
# All codes for a specific type
|
|
|
|
elif rule['port_range_max'] is None:
|
|
|
|
port_min, port_max = (rule['port_range_min'], -1)
|
|
|
|
|
2017-05-30 12:56:32 -07:00
|
|
|
sg_rule = SecurityGroupRule(is_ipv6, remote_ip_addr,
|
|
|
|
ip_prefix_len,
|
|
|
|
remote_group_id, protocol, port_min,
|
|
|
|
port_max)
|
2016-11-10 14:42:13 -08:00
|
|
|
return sg_rule
|
|
|
|
|
2017-01-03 20:38:53 -08:00
|
|
|
def send_secgroup_to_agents(self, session, secgroup):
|
2016-11-10 14:42:13 -08:00
|
|
|
"""Writes a secgroup to the etcd secgroup space
|
|
|
|
|
2017-01-03 20:38:53 -08:00
|
|
|
Does this via the journal as part of the commit, so
|
|
|
|
that the write is atomic with the DB commit to the
|
|
|
|
Neutron tables.
|
|
|
|
|
2016-11-10 14:42:13 -08:00
|
|
|
Arguments:
|
2017-01-03 20:38:53 -08:00
|
|
|
session -- the DB session with an open transaction
|
2016-11-10 14:42:13 -08:00
|
|
|
secgroup -- Named tuple representing a SecurityGroup
|
|
|
|
"""
|
|
|
|
secgroup_path = self._secgroup_path(secgroup.id)
|
|
|
|
# sg is a dict of of ingress and egress rule lists
|
|
|
|
sg = {}
|
|
|
|
ingress_rules = []
|
|
|
|
egress_rules = []
|
|
|
|
for ingress_rule in secgroup.ingress_rules:
|
|
|
|
ingress_rules.append(ingress_rule._asdict())
|
|
|
|
for egress_rule in secgroup.egress_rules:
|
|
|
|
egress_rules.append(egress_rule._asdict())
|
|
|
|
sg['ingress_rules'] = ingress_rules
|
|
|
|
sg['egress_rules'] = egress_rules
|
2017-01-03 20:38:53 -08:00
|
|
|
db.journal_write(session, secgroup_path, sg)
|
2016-11-10 14:42:13 -08:00
|
|
|
|
2017-03-15 00:05:56 +11:00
|
|
|
def delete_secgroup_from_etcd(self, session, secgroup_id):
|
2016-11-10 14:42:13 -08:00
|
|
|
"""Deletes the secgroup key from etcd
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
secgroup_id -- The id of the security group that we want to delete
|
|
|
|
"""
|
2017-03-15 00:05:56 +11:00
|
|
|
secgroup_path = self._secgroup_path(secgroup_id)
|
2018-03-07 12:56:59 -08:00
|
|
|
# Delete the security-group from remote-groups
|
|
|
|
remote_group_path = self._remote_group_path(secgroup_id, '')
|
2017-03-15 00:05:56 +11:00
|
|
|
db.journal_write(session, secgroup_path, None)
|
2018-03-07 12:56:59 -08:00
|
|
|
db.journal_write(session, remote_group_path, None)
|
2016-11-10 14:42:13 -08:00
|
|
|
|
|
|
|
def _secgroup_path(self, secgroup_id):
|
|
|
|
return self.secgroup_key_space + "/" + secgroup_id
|
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
def _port_path(self, host, port):
|
2016-09-26 10:46:50 -07:00
|
|
|
return self.port_key_space + "/" + host + "/ports/" + port['id']
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-05-30 12:56:32 -07:00
|
|
|
# TODO(najoy): Move all security groups related code to a dedicated
|
|
|
|
# module
|
|
|
|
def _remote_group_path(self, secgroup_id, port_id):
|
|
|
|
return self.remote_group_key_space + "/" + secgroup_id + "/" + port_id
|
|
|
|
|
2017-08-02 14:53:06 -07:00
|
|
|
def _gpe_remote_path(self, host, port, segmentation_id):
|
|
|
|
ip_addrs = port.get('fixed_ips', [])
|
|
|
|
gpe_dir = self.gpe_key_space + "/" + str(segmentation_id) + "/" + \
|
|
|
|
host + "/" + port['mac_address']
|
|
|
|
if ip_addrs and segmentation_id:
|
|
|
|
# Delete all GPE keys and the empty GPE directory itself in etcd
|
|
|
|
return [gpe_dir + "/" + ip_address['ip_address']
|
|
|
|
for ip_address in ip_addrs] + [gpe_dir]
|
|
|
|
else:
|
|
|
|
return []
|
|
|
|
|
2017-05-30 12:56:32 -07:00
|
|
|
# A remote_group_path is qualified with a port ID because the agent uses
|
|
|
|
# the port ID to keep track of the dynamic set of ports associated with
|
|
|
|
# the remote_group_id. The value of this key is the
|
|
|
|
# list of IP addresses allocated to that port by neutron.
|
|
|
|
# Using the above two pieces of information, the vpp-agent
|
|
|
|
# computes the complete set of IP addresses belonging to a
|
|
|
|
# remote_group_id and uses it to expand the rule when a remote_group_id
|
|
|
|
# attribute is specified. The expansion is performed by computing a
|
|
|
|
# product using all the IP addresses of the ports in the remote_group
|
|
|
|
# and the remaining attributes of the rule.
|
|
|
|
def _remote_group_paths(self, port):
|
|
|
|
security_groups = port.get('security_groups', [])
|
|
|
|
return [self._remote_group_path(secgroup_id, port['id'])
|
|
|
|
for secgroup_id in security_groups]
|
|
|
|
|
2016-09-14 16:01:34 -07:00
|
|
|
######################################################################
|
|
|
|
# These functions use a DB journal to log messages before
|
|
|
|
# they're put into etcd, as that can take time and may not always
|
|
|
|
# succeed (so is a bad candidate for calling within or after a
|
|
|
|
# transaction).
|
|
|
|
|
|
|
|
def bind(self, session, port, segment, host, binding_type):
|
|
|
|
# NB segmentation_id is not optional in the wireline protocol,
|
|
|
|
# we just pass 0 for unsegmented network types
|
|
|
|
data = {
|
|
|
|
'mac_address': port['mac_address'],
|
|
|
|
'mtu': 1500, # not this, but what?: port['mtu'],
|
|
|
|
'physnet': segment[api.PHYSICAL_NETWORK],
|
|
|
|
'network_type': segment[api.NETWORK_TYPE],
|
|
|
|
'segmentation_id': segment.get(api.SEGMENTATION_ID, 0),
|
|
|
|
'binding_type': binding_type,
|
2017-01-13 15:38:05 -08:00
|
|
|
'security_groups': port.get('security_groups', []),
|
|
|
|
'allowed_address_pairs': port.get('allowed_address_pairs', []),
|
|
|
|
'fixed_ips': port.get('fixed_ips', []),
|
2018-07-06 12:50:30 -07:00
|
|
|
'port_security_enabled': port.get('port_security_enabled', True),
|
|
|
|
# Non-essential, but useful for monitoring and debug:
|
|
|
|
'device_id': port.get('device_id', None)
|
2016-09-14 16:01:34 -07:00
|
|
|
}
|
2017-04-25 19:39:25 +10:00
|
|
|
LOG.debug("Queueing bind request for port:%s, "
|
2016-09-14 16:01:34 -07:00
|
|
|
"segment:%s, host:%s, type:%s",
|
|
|
|
port, data['segmentation_id'],
|
|
|
|
host, data['binding_type'])
|
|
|
|
|
|
|
|
db.journal_write(session, self._port_path(host, port), data)
|
2017-05-30 12:56:32 -07:00
|
|
|
# For tracking ports in a remote_group, create a journal entry in
|
|
|
|
# the remote-group key-space.
|
|
|
|
# This will result in the creation of an etcd key with the port ID
|
|
|
|
# and it's security-group-id. The value is the list of IP addresses.
|
|
|
|
# For details on how the agent thread handles the remote-group
|
|
|
|
# watch events refer to the doc under RemoteGroupWatcher in the
|
|
|
|
# server module.
|
|
|
|
for remote_group_path in self._remote_group_paths(port):
|
|
|
|
db.journal_write(session, remote_group_path,
|
|
|
|
[item['ip_address'] for item in
|
|
|
|
data['fixed_ips']])
|
2016-09-14 16:01:34 -07:00
|
|
|
self.kick()
|
|
|
|
|
2017-08-02 14:53:06 -07:00
|
|
|
def unbind(self, session, port, host, segment):
|
|
|
|
# GPE requires segmentation ID for removing its etcd keys
|
|
|
|
segmentation_id = segment.get(api.SEGMENTATION_ID, 0)
|
|
|
|
LOG.debug("Queueing unbind request for port:%s, host:%s, segment:%s",
|
|
|
|
port, host, segmentation_id)
|
2017-05-30 12:56:32 -07:00
|
|
|
# When a port is unbound, this journal entry will delete the
|
|
|
|
# port key (and hence it's ip address value) from etcd. The behavior
|
|
|
|
# is like removing the port IP address(es) from the
|
|
|
|
# remote-group. The agent will receive a watch notification and
|
|
|
|
# update the ACL rules to remove the IP(s) from the rule.
|
|
|
|
# Other port IP addresses associated with the remote-group-id will
|
|
|
|
# remain in the rule (as it should be).
|
|
|
|
# A hypothetical alternate implementation - If we made the remote key
|
|
|
|
# of a secgroup, a list of IPs, we could refresh the content of
|
|
|
|
# every secgroup containing this port.
|
|
|
|
# It makes the sender's work harder, and receiver's work easier
|
|
|
|
# In the sender, we need keep track of IPs during port binds, unbinds,
|
|
|
|
# security-group associations and updates. However, algorithmically
|
|
|
|
# this alternate impl. won't be better what we have now i.e. O(n).
|
|
|
|
for remote_group_path in self._remote_group_paths(port):
|
|
|
|
db.journal_write(session, remote_group_path, None)
|
2016-09-14 16:01:34 -07:00
|
|
|
db.journal_write(session, self._port_path(host, port), None)
|
2017-08-02 14:53:06 -07:00
|
|
|
# Remove all GPE remote keys from etcd, for this port
|
|
|
|
for gpe_remote_path in self._gpe_remote_path(host, port,
|
|
|
|
segmentation_id):
|
|
|
|
db.journal_write(session,
|
|
|
|
gpe_remote_path,
|
|
|
|
None)
|
2016-09-14 16:01:34 -07:00
|
|
|
self.kick()
|
|
|
|
|
2018-06-15 16:49:04 -07:00
|
|
|
def remove_port_from_remote_groups(self, session, original_port,
|
2017-05-30 12:56:32 -07:00
|
|
|
current_port):
|
|
|
|
"""Remove ports from remote groups when port security is updated."""
|
|
|
|
removed_sec_groups = set(original_port['security_groups']) - set(
|
|
|
|
current_port['security_groups'])
|
|
|
|
for secgroup_id in removed_sec_groups:
|
|
|
|
db.journal_write(session,
|
|
|
|
self._remote_group_path(secgroup_id,
|
|
|
|
current_port['id']),
|
|
|
|
None)
|
|
|
|
self.kick()
|
|
|
|
|
2017-02-16 16:15:29 -08:00
|
|
|
def make_return_worker(self):
|
2016-09-14 16:01:34 -07:00
|
|
|
"""The thread that manages data returned from agents via etcd."""
|
|
|
|
|
2017-02-16 16:15:29 -08:00
|
|
|
# TODO(ijw): agents and physnets should be checked before a bind
|
|
|
|
# is accepted
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-02-16 16:15:29 -08:00
|
|
|
# Note that the initial load is done before spawning the background
|
|
|
|
# watcher - this means that we're prepared with the information
|
|
|
|
# to accept bind requests.
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-04-25 15:20:23 +10:00
|
|
|
class ReturnWatcher(etcdutils.EtcdChangeWatcher):
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-02-20 08:42:07 -08:00
|
|
|
def __init__(self, etcd_client, name, watch_path,
|
|
|
|
election_path=None, data=None):
|
|
|
|
super(ReturnWatcher, self).__init__(etcd_client,
|
|
|
|
name, watch_path,
|
|
|
|
election_path,
|
|
|
|
wait_until_elected=True,
|
|
|
|
data=data)
|
|
|
|
|
2017-02-16 16:15:29 -08:00
|
|
|
# Every key changes on a restart, which has the
|
|
|
|
# useful effect of resending all Nova notifications
|
|
|
|
# for 'port bound' events based on existing state.
|
2017-03-07 07:15:02 +11:00
|
|
|
def added(self, key, value):
|
2016-10-03 20:18:08 -07:00
|
|
|
# Matches a port key, gets host and uuid
|
2017-03-07 07:15:02 +11:00
|
|
|
m = re.match('^([^/]+)/ports/([^/]+)$', key)
|
2016-09-26 10:46:50 -07:00
|
|
|
|
2016-10-03 20:18:08 -07:00
|
|
|
if m:
|
|
|
|
host = m.group(1)
|
|
|
|
port = m.group(2)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
2017-03-07 07:15:02 +11:00
|
|
|
self.data.notify_bound(port, host)
|
2016-10-03 20:18:08 -07:00
|
|
|
else:
|
2017-03-07 07:15:02 +11:00
|
|
|
# Matches an agent, gets a liveness notification
|
|
|
|
m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
|
2016-10-03 20:18:08 -07:00
|
|
|
key)
|
2016-09-14 16:01:34 -07:00
|
|
|
|
|
|
|
if m:
|
2016-10-03 20:18:08 -07:00
|
|
|
# TODO(ijw): this should be fed into the agents
|
|
|
|
# table.
|
2016-09-14 16:01:34 -07:00
|
|
|
host = m.group(1)
|
|
|
|
|
2017-04-25 20:29:34 +10:00
|
|
|
LOG.info('host %s is alive', host)
|
2017-03-07 07:15:02 +11:00
|
|
|
|
|
|
|
def removed(self, key):
|
|
|
|
# Nova doesn't much care when ports go away.
|
|
|
|
|
|
|
|
# Matches an agent, gets a liveness notification
|
|
|
|
m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
|
|
|
|
key)
|
|
|
|
|
|
|
|
if m:
|
|
|
|
# TODO(ijw): this should be fed into the agents
|
|
|
|
# table.
|
|
|
|
host = m.group(1)
|
|
|
|
|
|
|
|
LOG.info('host %s has died', host)
|
2016-09-26 10:46:50 -07:00
|
|
|
|
2017-02-16 16:15:29 -08:00
|
|
|
# Assign a UUID to each worker thread to enable thread election
|
|
|
|
return eventlet.spawn(
|
2017-03-15 00:16:44 +11:00
|
|
|
ReturnWatcher(self.client_factory.client(), 'return_worker',
|
2017-02-16 16:15:29 -08:00
|
|
|
self.state_key_space, self.election_key_space,
|
|
|
|
data=self).watch_forever)
|