lb: ml2-agt: Separate AgentLoop from LinuxBridge specific impl

The goal is to extract the common agent code from the linuxbridge agent
to share this code with other agents (e.g. sriov and new macvtap [1]).
This is a first step into the direction of a so called modular l2
agent.

Therefore all linuxbridge implementation specifics are moved into the
LinuxBridgeManager class. The manager class will be passed as argument
into the common agent loop instead of instantiating it in its
constructor. In addition the network_maps and the updated_devices map
has been moved into the rpc class.

A clear manager interface has been defined for the communication
between the common agent loop and the impl specific manager class.

In a follow up patchset, the common agent loop will be moved into a
new file. This has not yet happened to simplify tracking the code
changes during review.

[1] https://bugs.launchpad.net/neutron/+bug/1480979

Change-Id: Ia71f5a403b7029f8cc591f83df91ab2d3916f3f8
Partial-Bug: #1468803
Partial-Bug: #1480979
This commit is contained in:
Andreas Scheuring 2015-10-13 13:21:32 +02:00
parent 9434fdec0b
commit 6e29cdd6b6
9 changed files with 554 additions and 261 deletions

View File

@ -43,6 +43,7 @@ import neutron.extensions.l3
import neutron.extensions.securitygroup import neutron.extensions.securitygroup
import neutron.openstack.common.cache.cache import neutron.openstack.common.cache.cache
import neutron.plugins.ml2.config import neutron.plugins.ml2.config
import neutron.plugins.ml2.drivers.agent.config
import neutron.plugins.ml2.drivers.linuxbridge.agent.common.config import neutron.plugins.ml2.drivers.linuxbridge.agent.common.config
import neutron.plugins.ml2.drivers.mech_sriov.agent.common.config import neutron.plugins.ml2.drivers.mech_sriov.agent.common.config
import neutron.plugins.ml2.drivers.mech_sriov.mech_driver.mech_driver import neutron.plugins.ml2.drivers.mech_sriov.mech_driver.mech_driver
@ -179,8 +180,7 @@ def list_linux_bridge_opts():
neutron.plugins.ml2.drivers.linuxbridge.agent.common.config. neutron.plugins.ml2.drivers.linuxbridge.agent.common.config.
vxlan_opts), vxlan_opts),
('agent', ('agent',
neutron.plugins.ml2.drivers.linuxbridge.agent.common.config. neutron.plugins.ml2.drivers.agent.config.agent_opts),
agent_opts),
('securitygroup', ('securitygroup',
neutron.agent.securitygroups_rpc.security_group_opts) neutron.agent.securitygroups_rpc.security_group_opts)
] ]

View File

@ -0,0 +1,204 @@
# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_log import log as logging
import six
LOG = logging.getLogger(__name__)
class NetworkSegment(object):
"""Represents a Neutron network segment"""
def __init__(self, network_type, physical_network, segmentation_id):
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
@six.add_metaclass(abc.ABCMeta)
class CommonAgentManagerRpcCallBackBase(object):
"""Base class for managers RPC callbacks.
This class must be inherited by a RPC callback class that is used
in combination with the common agent.
"""
def __init__(self, context, agent, sg_agent):
self.context = context
self.agent = agent
self.sg_agent = sg_agent
self.network_map = {}
# stores received port_updates and port_deletes for
# processing by the main loop
self.updated_devices = set()
@abc.abstractmethod
def security_groups_rule_updated(self, context, **kwargs):
"""Callback for security group rule update.
:param security_groups: list of updated security_groups
"""
@abc.abstractmethod
def security_groups_member_updated(self, context, **kwargs):
"""Callback for security group member update.
:param security_groups: list of updated security_groups
"""
@abc.abstractmethod
def security_groups_provider_updated(self, context, **kwargs):
"""Callback for security group provider update."""
def add_network(self, network_id, network_segment):
"""Add a network to the agent internal network list
:param network_id: The UUID of the network
:param network_segment: The NetworkSegment object for this network
"""
self.network_map[network_id] = network_segment
def get_and_clear_updated_devices(self):
"""Get and clear the list of devices for which a update was received.
:return: set - A set with updated devices. Format is ['tap1', 'tap2']
"""
# Save and reinitialize the set variable that the port_update RPC uses.
# This should be thread-safe as the greenthread should not yield
# between these two statements.
updated_devices = self.updated_devices
self.updated_devices = set()
return updated_devices
@six.add_metaclass(abc.ABCMeta)
class CommonAgentManagerBase(object):
"""Base class for managers that are used with the common agent loop.
This class must be inherited by a manager class that is used
in combination with the common agent.
"""
@abc.abstractmethod
def ensure_port_admin_state(self, device, admin_state_up):
"""Enforce admin_state for a port
:param device: The device for which the admin_state should be set
:param admin_state_up: True for admin_state_up, False for
admin_state_down
"""
@abc.abstractmethod
def get_agent_configurations(self):
"""Establishes the agent configuration map.
The content of this map is part of the agent state reports to the
neutron server.
:return: map -- the map containing the configuration values
:rtype: dict
"""
@abc.abstractmethod
def get_agent_id(self):
"""Calculate the agent id that should be used on this host
:return: str -- agent identifier
"""
@abc.abstractmethod
def get_all_devices(self):
"""Get a list of all devices of the managed type from this host
A device in this context is a String that represents a network device.
This can for example be the name of the device or its MAC address.
This value will be stored in the Plug-in and be part of the
device_details.
Typically this list is retrieved from the sysfs. E.g. for linuxbridge
it returns all names of devices of type 'tap' that start with a certain
prefix.
:return: set -- the set of all devices e.g. ['tap1', 'tap2']
"""
@abc.abstractmethod
def get_extension_driver_type(self):
"""Get the agent extension driver type.
:return: str -- The String defining the agent extension type
"""
@abc.abstractmethod
def get_rpc_callbacks(self, context, agent, sg_agent):
"""Returns the class containing all the agent rpc callback methods
:return: class - the class containing the agent rpc callback methods.
It must reflect the CommonAgentManagerRpcCallBackBase Interface.
"""
@abc.abstractmethod
def get_rpc_consumers(self):
"""Get a list of topics for which an RPC consumer should be created
:return: list -- A list of topics. Each topic in this list is a list
consisting of a name, an operation, and an optional host param
keying the subscription to topic.host for plugin calls.
"""
@abc.abstractmethod
def plug_interface(self, network_id, network_segment, device,
device_owner):
"""Plug the interface (device).
:param network_id: The UUID of the Neutron network
:param network_segment: The NetworkSegment object for this network
:param device: The device that should be plugged
:param device_owner: The device owner of the port
:return: bool -- True if the interface is plugged now. False if the
interface could not be plugged.
"""
@abc.abstractmethod
def setup_arp_spoofing_protection(self, device, device_details):
"""Setup the arp spoofing protection for the given port.
:param device: The device to set up arp spoofing rules for, where
device is the device String that is stored in the Neutron Plug-in
for this Port. E.g. 'tap1'
:param device_details: The device_details map retrieved from the
Neutron Plugin
"""
@abc.abstractmethod
def delete_arp_spoofing_protection(self, devices):
"""Remove the arp spoofing protection for the given ports.
:param devices: List of devices that have been removed, where device
is the device String that is stored for this port in the Neutron
Plug-in. E.g. ['tap1', 'tap2']
"""
@abc.abstractmethod
def delete_unreferenced_arp_protection(self, current_devices):
"""Cleanup arp spoofing protection entries.
:param current_devices: List of devices that currently exist on this
host, where device is the device String that could have been stored
in the Neutron Plug-in. E.g. ['tap1', 'tap2']
"""

View File

@ -0,0 +1,48 @@
# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from neutron.agent.common import config
agent_opts = [
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
cfg.IntOpt('quitting_rpc_timeout', default=10,
help=_("Set new timeout in seconds for new rpc calls after "
"agent receives SIGTERM. If value is set to 0, rpc "
"timeout won't be changed")),
# TODO(kevinbenton): The following opt is duplicated between the OVS agent
# and the Linuxbridge agent to make it easy to back-port. These shared opts
# should be moved into a common agent config options location as part of
# the deduplication work.
cfg.BoolOpt('prevent_arp_spoofing', default=True,
help=_("Enable suppression of ARP responses that don't match "
"an IP address that belongs to the port from which "
"they originate. Note: This prevents the VMs attached "
"to this agent from spoofing, it doesn't protect them "
"from other devices which have the capability to spoof "
"(e.g. bare metal or VMs attached to agents without "
"this flag set to True). Spoofing rules will not be "
"added to any ports that have port security disabled. "
"For LinuxBridge, this requires ebtables. For OVS, it "
"requires a version that supports matching ARP "
"headers."))
]
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)

View File

@ -15,7 +15,6 @@
from oslo_config import cfg from oslo_config import cfg
from neutron._i18n import _ from neutron._i18n import _
from neutron.agent.common import config
DEFAULT_BRIDGE_MAPPINGS = [] DEFAULT_BRIDGE_MAPPINGS = []
DEFAULT_INTERFACE_MAPPINGS = [] DEFAULT_INTERFACE_MAPPINGS = []
@ -63,34 +62,6 @@ bridge_opts = [
help=_("List of <physical_network>:<physical_bridge>")), help=_("List of <physical_network>:<physical_bridge>")),
] ]
agent_opts = [
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
cfg.IntOpt('quitting_rpc_timeout', default=10,
help=_("Set new timeout in seconds for new rpc calls after "
"agent receives SIGTERM. If value is set to 0, rpc "
"timeout won't be changed")),
# TODO(kevinbenton): The following opt is duplicated between the OVS agent
# and the Linuxbridge agent to make it easy to back-port. These shared opts
# should be moved into a common agent config options location as part of
# the deduplication work.
cfg.BoolOpt('prevent_arp_spoofing', default=True,
help=_("Enable suppression of ARP responses that don't match "
"an IP address that belongs to the port from which "
"they originate. Note: This prevents the VMs attached "
"to this agent from spoofing, it doesn't protect them "
"from other devices which have the capability to spoof "
"(e.g. bare metal or VMs attached to agents without "
"this flag set to True). Spoofing rules will not be "
"added to any ports that have port security disabled. "
"For LinuxBridge, this requires ebtables. For OVS, it "
"requires a version that supports matching ARP "
"headers."))
]
cfg.CONF.register_opts(vxlan_opts, "VXLAN") cfg.CONF.register_opts(vxlan_opts, "VXLAN")
cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE") cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE")
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)

View File

@ -46,6 +46,8 @@ from neutron.common import topics
from neutron.common import utils as n_utils from neutron.common import utils as n_utils
from neutron import context from neutron import context
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
from neutron.plugins.ml2.drivers.l2pop.rpc_manager \ from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
import l2population_rpc as l2pop_rpc import l2population_rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect
@ -56,19 +58,14 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
LB_AGENT_BINARY = 'neutron-linuxbridge-agent'
BRIDGE_NAME_PREFIX = "brq" BRIDGE_NAME_PREFIX = "brq"
VXLAN_INTERFACE_PREFIX = "vxlan-" VXLAN_INTERFACE_PREFIX = "vxlan-"
class NetworkSegment(object): class LinuxBridgeManager(amb.CommonAgentManagerBase):
def __init__(self, network_type, physical_network, segmentation_id):
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
class LinuxBridgeManager(object):
def __init__(self, bridge_mappings, interface_mappings): def __init__(self, bridge_mappings, interface_mappings):
super(LinuxBridgeManager, self).__init__()
self.bridge_mappings = bridge_mappings self.bridge_mappings = bridge_mappings
self.interface_mappings = interface_mappings self.interface_mappings = interface_mappings
self.validate_interface_mappings() self.validate_interface_mappings()
@ -82,8 +79,6 @@ class LinuxBridgeManager(object):
self.validate_vxlan_group_with_local_ip() self.validate_vxlan_group_with_local_ip()
self.local_int = device.name self.local_int = device.name
self.check_vxlan_support() self.check_vxlan_support()
# Store network mapping to segments
self.network_map = {}
def validate_interface_mappings(self): def validate_interface_mappings(self):
for physnet, interface in self.interface_mappings.items(): for physnet, interface in self.interface_mappings.items():
@ -462,15 +457,12 @@ class LinuxBridgeManager(object):
phy_dev_mtu = ip_lib.IPDevice(phy_dev_name).link.mtu phy_dev_mtu = ip_lib.IPDevice(phy_dev_name).link.mtu
ip_lib.IPDevice(tap_dev_name).link.set_mtu(phy_dev_mtu) ip_lib.IPDevice(tap_dev_name).link.set_mtu(phy_dev_mtu)
def add_interface(self, network_id, network_type, physical_network, def plug_interface(self, network_id, network_segment, tap_name,
segmentation_id, port_id, device_owner): device_owner):
self.network_map[network_id] = NetworkSegment(network_type, return self.add_tap_interface(network_id, network_segment.network_type,
physical_network, network_segment.physical_network,
segmentation_id) network_segment.segmentation_id,
tap_device_name = self.get_tap_device_name(port_id) tap_name, device_owner)
return self.add_tap_interface(network_id, network_type,
physical_network, segmentation_id,
tap_device_name, device_owner)
def delete_bridge(self, bridge_name): def delete_bridge(self, bridge_name):
bridge_device = bridge_lib.BridgeDevice(bridge_name) bridge_device = bridge_lib.BridgeDevice(bridge_name)
@ -538,7 +530,7 @@ class LinuxBridgeManager(object):
device.link.delete() device.link.delete()
LOG.debug("Done deleting interface %s", interface) LOG.debug("Done deleting interface %s", interface)
def get_tap_devices(self): def get_all_devices(self):
devices = set() devices = set()
for device in bridge_lib.get_bridge_names(): for device in bridge_lib.get_bridge_names():
if device.startswith(constants.TAP_DEVICE_PREFIX): if device.startswith(constants.TAP_DEVICE_PREFIX):
@ -660,9 +652,66 @@ class LinuxBridgeManager(object):
elif self.vxlan_mode == lconst.VXLAN_UCAST: elif self.vxlan_mode == lconst.VXLAN_UCAST:
self.remove_fdb_bridge_entry(mac, agent_ip, interface) self.remove_fdb_bridge_entry(mac, agent_ip, interface)
def get_agent_id(self):
if self.bridge_mappings:
mac = utils.get_interface_mac(self.bridge_mappings.values[0])
else:
devices = ip_lib.IPWrapper().get_devices(True)
if devices:
mac = utils.get_interface_mac(devices[0].name)
else:
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
sys.exit(1)
return 'lb%s' % mac.replace(":", "")
class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def get_agent_configurations(self):
l2pop_rpc.L2populationRpcCallBackMixin): configurations = {'bridge_mappings': self.bridge_mappings,
'interface_mappings': self.interface_mappings
}
if self.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
return configurations
def get_rpc_callbacks(self, context, agent, sg_agent):
return LinuxBridgeRpcCallbacks(context, agent, sg_agent)
def get_rpc_consumers(self):
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.NETWORK, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION, topics.UPDATE])
return consumers
def ensure_port_admin_state(self, tap_name, admin_state_up):
LOG.debug("Setting admin_state_up to %s for device %s",
admin_state_up, tap_name)
if admin_state_up:
ip_lib.IPDevice(tap_name).link.set_up()
else:
ip_lib.IPDevice(tap_name).link.set_down()
def setup_arp_spoofing_protection(self, device, device_details):
arp_protect.setup_arp_spoofing_protection(device, device_details)
def delete_arp_spoofing_protection(self, devices):
arp_protect.delete_arp_spoofing_protection(devices)
def delete_unreferenced_arp_protection(self, current_devices):
arp_protect.delete_unreferenced_arp_protection(current_devices)
def get_extension_driver_type(self):
return lconst.EXTENSION_DRIVER_TYPE
class LinuxBridgeRpcCallbacks(
sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2pop_rpc.L2populationRpcCallBackMixin,
amb.CommonAgentManagerRpcCallBackBase):
# Set RPC API version to 1.0 by default. # Set RPC API version to 1.0 by default.
# history # history
@ -671,20 +720,14 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# 1.4 Added support for network_update # 1.4 Added support for network_update
target = oslo_messaging.Target(version='1.4') target = oslo_messaging.Target(version='1.4')
def __init__(self, context, agent, sg_agent):
super(LinuxBridgeRpcCallbacks, self).__init__()
self.context = context
self.agent = agent
self.sg_agent = sg_agent
def network_delete(self, context, **kwargs): def network_delete(self, context, **kwargs):
LOG.debug("network_delete received") LOG.debug("network_delete received")
network_id = kwargs.get('network_id') network_id = kwargs.get('network_id')
# NOTE(nick-ma-z): Don't remove pre-existing user-defined bridges # NOTE(nick-ma-z): Don't remove pre-existing user-defined bridges
if network_id in self.agent.br_mgr.network_map: if network_id in self.network_map:
phynet = self.agent.br_mgr.network_map[network_id].physical_network phynet = self.network_map[network_id].physical_network
if phynet and phynet in self.agent.br_mgr.bridge_mappings: if phynet and phynet in self.agent.mgr.bridge_mappings:
LOG.info(_LI("Physical network %s is defined in " LOG.info(_LI("Physical network %s is defined in "
"bridge_mappings and cannot be deleted."), "bridge_mappings and cannot be deleted."),
network_id) network_id)
@ -693,18 +736,18 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.error(_LE("Network %s is not available."), network_id) LOG.error(_LE("Network %s is not available."), network_id)
return return
bridge_name = self.agent.br_mgr.get_bridge_name(network_id) bridge_name = self.agent.mgr.get_bridge_name(network_id)
LOG.debug("Delete %s", bridge_name) LOG.debug("Delete %s", bridge_name)
self.agent.br_mgr.delete_bridge(bridge_name) self.agent.mgr.delete_bridge(bridge_name)
def port_update(self, context, **kwargs): def port_update(self, context, **kwargs):
port_id = kwargs['port']['id'] port_id = kwargs['port']['id']
tap_name = self.agent.br_mgr.get_tap_device_name(port_id) device_name = self.agent.mgr.get_tap_device_name(port_id)
# Put the tap name in the updated_devices set. # Put the device name in the updated_devices set.
# Do not store port details, as if they're used for processing # Do not store port details, as if they're used for processing
# notifications there is no guarantee the notifications are # notifications there is no guarantee the notifications are
# processed in the same order as the relevant API requests. # processed in the same order as the relevant API requests.
self.agent.updated_devices.add(tap_name) self.updated_devices.add(device_name)
LOG.debug("port_update RPC received for port: %s", port_id) LOG.debug("port_update RPC received for port: %s", port_id)
def network_update(self, context, **kwargs): def network_update(self, context, **kwargs):
@ -714,76 +757,76 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
{'network_id': network_id, {'network_id': network_id,
'ports': self.agent.network_ports[network_id]}) 'ports': self.agent.network_ports[network_id]})
for port_data in self.agent.network_ports[network_id]: for port_data in self.agent.network_ports[network_id]:
self.agent.updated_devices.add(port_data['device']) self.updated_devices.add(port_data['device'])
def fdb_add(self, context, fdb_entries): def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received") LOG.debug("fdb_add received")
for network_id, values in fdb_entries.items(): for network_id, values in fdb_entries.items():
segment = self.agent.br_mgr.network_map.get(network_id) segment = self.network_map.get(network_id)
if not segment: if not segment:
return return
if segment.network_type != p_const.TYPE_VXLAN: if segment.network_type != p_const.TYPE_VXLAN:
return return
interface = self.agent.br_mgr.get_vxlan_device_name( interface = self.agent.mgr.get_vxlan_device_name(
segment.segmentation_id) segment.segmentation_id)
agent_ports = values.get('ports') agent_ports = values.get('ports')
for agent_ip, ports in agent_ports.items(): for agent_ip, ports in agent_ports.items():
if agent_ip == self.agent.br_mgr.local_ip: if agent_ip == self.agent.mgr.local_ip:
continue continue
self.agent.br_mgr.add_fdb_entries(agent_ip, self.agent.mgr.add_fdb_entries(agent_ip,
ports, ports,
interface) interface)
def fdb_remove(self, context, fdb_entries): def fdb_remove(self, context, fdb_entries):
LOG.debug("fdb_remove received") LOG.debug("fdb_remove received")
for network_id, values in fdb_entries.items(): for network_id, values in fdb_entries.items():
segment = self.agent.br_mgr.network_map.get(network_id) segment = self.network_map.get(network_id)
if not segment: if not segment:
return return
if segment.network_type != p_const.TYPE_VXLAN: if segment.network_type != p_const.TYPE_VXLAN:
return return
interface = self.agent.br_mgr.get_vxlan_device_name( interface = self.agent.mgr.get_vxlan_device_name(
segment.segmentation_id) segment.segmentation_id)
agent_ports = values.get('ports') agent_ports = values.get('ports')
for agent_ip, ports in agent_ports.items(): for agent_ip, ports in agent_ports.items():
if agent_ip == self.agent.br_mgr.local_ip: if agent_ip == self.agent.mgr.local_ip:
continue continue
self.agent.br_mgr.remove_fdb_entries(agent_ip, self.agent.mgr.remove_fdb_entries(agent_ip,
ports, ports,
interface) interface)
def _fdb_chg_ip(self, context, fdb_entries): def _fdb_chg_ip(self, context, fdb_entries):
LOG.debug("update chg_ip received") LOG.debug("update chg_ip received")
for network_id, agent_ports in fdb_entries.items(): for network_id, agent_ports in fdb_entries.items():
segment = self.agent.br_mgr.network_map.get(network_id) segment = self.network_map.get(network_id)
if not segment: if not segment:
return return
if segment.network_type != p_const.TYPE_VXLAN: if segment.network_type != p_const.TYPE_VXLAN:
return return
interface = self.agent.br_mgr.get_vxlan_device_name( interface = self.agent.mgr.get_vxlan_device_name(
segment.segmentation_id) segment.segmentation_id)
for agent_ip, state in agent_ports.items(): for agent_ip, state in agent_ports.items():
if agent_ip == self.agent.br_mgr.local_ip: if agent_ip == self.agent.mgr.local_ip:
continue continue
after = state.get('after', []) after = state.get('after', [])
for mac, ip in after: for mac, ip in after:
self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface) self.agent.mgr.add_fdb_ip_entry(mac, ip, interface)
before = state.get('before', []) before = state.get('before', [])
for mac, ip in before: for mac, ip in before:
self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface) self.agent.mgr.remove_fdb_ip_entry(mac, ip, interface)
def fdb_update(self, context, fdb_entries): def fdb_update(self, context, fdb_entries):
LOG.debug("fdb_update received") LOG.debug("fdb_update received")
@ -795,57 +838,55 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
getattr(self, method)(context, values) getattr(self, method)(context, values)
class LinuxBridgeNeutronAgentRPC(service.Service): class CommonAgentLoop(service.Service):
def __init__(self, bridge_mappings, interface_mappings, polling_interval, def __init__(self, manager, polling_interval,
quitting_rpc_timeout): quitting_rpc_timeout, agent_type, agent_binary):
"""Constructor. """Constructor.
:param bridge_mappings: dict mapping physical_networks to :param manager: the manager object containing the impl specifics
physical_bridges.
:param interface_mappings: dict mapping physical_networks to
physical_interfaces.
:param polling_interval: interval (secs) to poll DB. :param polling_interval: interval (secs) to poll DB.
:param quitting_rpc_timeout: timeout in seconds for rpc calls after :param quitting_rpc_timeout: timeout in seconds for rpc calls after
stop is called. stop is called.
:param agent_type: Specifies the type of the agent
:param agent_binary: The agent binary string
""" """
super(LinuxBridgeNeutronAgentRPC, self).__init__() super(CommonAgentLoop, self).__init__()
self.interface_mappings = interface_mappings self.mgr = manager
self.bridge_mappings = bridge_mappings self._validate_manager_class()
self.polling_interval = polling_interval self.polling_interval = polling_interval
self.quitting_rpc_timeout = quitting_rpc_timeout self.quitting_rpc_timeout = quitting_rpc_timeout
self.agent_type = agent_type
self.agent_binary = agent_binary
def _validate_manager_class(self):
if not isinstance(self.mgr,
amb.CommonAgentManagerBase):
LOG.error(_LE("Manager class must inherit from "
"CommonAgentManagerBase to ensure CommonAgent "
"works properly."))
sys.exit(1)
def start(self): def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
# stores received port_updates and port_deletes for
# processing by the main loop
self.updated_devices = set()
# stores all configured ports on agent # stores all configured ports on agent
self.network_ports = collections.defaultdict(list) self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival # flag to do a sync after revival
self.fullsync = False self.fullsync = False
self.context = context.get_admin_context_without_session() self.context = context.get_admin_context_without_session()
self.setup_rpc(self.interface_mappings.values()) self.setup_rpc()
self.init_extension_manager(self.connection) self.init_extension_manager(self.connection)
configurations = { configurations = {'extensions': self.ext_manager.names()}
'bridge_mappings': self.bridge_mappings, configurations.update(self.mgr.get_agent_configurations())
'interface_mappings': self.interface_mappings,
'extensions': self.ext_manager.names()
}
if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
self.agent_state = { self.agent_state = {
'binary': 'neutron-linuxbridge-agent', 'binary': self.agent_binary,
'host': cfg.CONF.host, 'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC, 'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations, 'configurations': configurations,
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE, 'agent_type': self.agent_type,
'start_flag': True} 'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval report_interval = cfg.CONF.AGENT.report_interval
@ -856,17 +897,17 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
self.daemon_loop() self.daemon_loop()
def stop(self, graceful=True): def stop(self, graceful=True):
LOG.info(_LI("Stopping linuxbridge agent.")) LOG.info(_LI("Stopping %s agent."), self.agent_type)
if graceful and self.quitting_rpc_timeout: if graceful and self.quitting_rpc_timeout:
self.set_rpc_timeout(self.quitting_rpc_timeout) self.set_rpc_timeout(self.quitting_rpc_timeout)
super(LinuxBridgeNeutronAgentRPC, self).stop(graceful) super(CommonAgentLoop, self).stop(graceful)
def reset(self): def reset(self):
common_config.setup_logging() common_config.setup_logging()
def _report_state(self): def _report_state(self):
try: try:
devices = len(self.br_mgr.get_tap_devices()) devices = len(self.mgr.get_all_devices())
self.agent_state.get('configurations')['devices'] = devices self.agent_state.get('configurations')['devices'] = devices
agent_status = self.state_rpc.report_state(self.context, agent_status = self.state_rpc.report_state(self.context,
self.agent_state, self.agent_state,
@ -879,40 +920,33 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
except Exception: except Exception:
LOG.exception(_LE("Failed reporting state!")) LOG.exception(_LE("Failed reporting state!"))
def setup_rpc(self, physical_interfaces): def _validate_rpc_endpoints(self):
if physical_interfaces: if not isinstance(self.endpoints[0],
mac = utils.get_interface_mac(physical_interfaces[0]) amb.CommonAgentManagerRpcCallBackBase):
else: LOG.error(_LE("RPC Callback class must inherit from "
devices = ip_lib.IPWrapper().get_devices(True) "CommonAgentManagerRpcCallBackBase to ensure "
if devices: "CommonAgent works properly."))
mac = utils.get_interface_mac(devices[0].name) sys.exit(1)
else:
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
sys.exit(1)
def setup_rpc(self):
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc( self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True) self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", ""))) self.agent_id = self.mgr.get_agent_id()
LOG.info(_LI("RPC agent_id: %s"), self.agent_id) LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init # RPC network init
# Handle updates from service # Handle updates from service
self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self, self.rpc_callbacks = self.mgr.get_rpc_callbacks(self.context, self,
self.sg_agent)] self.sg_agent)
self.endpoints = [self.rpc_callbacks]
self._validate_rpc_endpoints()
# Define the listening consumers for the agent # Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE], consumers = self.mgr.get_rpc_consumers()
[topics.NETWORK, topics.DELETE],
[topics.NETWORK, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers(self.endpoints, self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic, self.topic,
consumers) consumers)
@ -922,19 +956,7 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
self.ext_manager = ( self.ext_manager = (
ext_manager.AgentExtensionsManager(cfg.CONF)) ext_manager.AgentExtensionsManager(cfg.CONF))
self.ext_manager.initialize( self.ext_manager.initialize(
connection, lconst.EXTENSION_DRIVER_TYPE) connection, self.mgr.get_extension_driver_type())
def setup_linux_bridge(self, bridge_mappings, interface_mappings):
self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
def _ensure_port_admin_state(self, port_id, admin_state_up):
LOG.debug("Setting admin_state_up to %s for port %s",
admin_state_up, port_id)
tap_name = self.br_mgr.get_tap_device_name(port_id)
if admin_state_up:
ip_lib.IPDevice(tap_name).link.set_up()
else:
ip_lib.IPDevice(tap_name).link.set_down()
def _clean_network_ports(self, device): def _clean_network_ports(self, device):
for netid, ports_list in self.network_ports.items(): for netid, ports_list in self.network_ports.items():
@ -988,17 +1010,19 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"), LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': device_details}) {'device': device, 'details': device_details})
if self.prevent_arp_spoofing: if self.prevent_arp_spoofing:
port = self.br_mgr.get_tap_device_name( self.mgr.setup_arp_spoofing_protection(device,
device_details['port_id']) device_details)
arp_protect.setup_arp_spoofing_protection(port,
device_details) segment = amb.NetworkSegment(
# create the networking for the port device_details.get('network_type'),
network_type = device_details.get('network_type') device_details['physical_network'],
segmentation_id = device_details.get('segmentation_id') device_details.get('segmentation_id')
tap_in_bridge = self.br_mgr.add_interface( )
device_details['network_id'], network_type, network_id = device_details['network_id']
device_details['physical_network'], segmentation_id, self.rpc_callbacks.add_network(network_id, segment)
device_details['port_id'], device_details['device_owner']) interface_plugged = self.mgr.plug_interface(
network_id, segment,
device, device_details['device_owner'])
# REVISIT(scheuran): Changed the way how ports admin_state_up # REVISIT(scheuran): Changed the way how ports admin_state_up
# is implemented. # is implemented.
# #
@ -1007,7 +1031,7 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
# - admin_state_down: remove tap from bridge # - admin_state_down: remove tap from bridge
# New lb implementation: # New lb implementation:
# - admin_state_up: set tap device state to up # - admin_state_up: set tap device state to up
# - admin_state_down: set tap device stae to down # - admin_state_down: set tap device state to down
# #
# However both approaches could result in races with # However both approaches could result in races with
# nova/libvirt and therefore to an invalid system state in the # nova/libvirt and therefore to an invalid system state in the
@ -1037,11 +1061,12 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
# 1) An existing race with libvirt caused by the behavior of # 1) An existing race with libvirt caused by the behavior of
# the old implementation. See Bug #1312016 # the old implementation. See Bug #1312016
# 2) The new code is much more readable # 2) The new code is much more readable
self._ensure_port_admin_state(device_details['port_id'], self.mgr.ensure_port_admin_state(
device_details['admin_state_up']) device,
device_details['admin_state_up'])
# update plugin about port status if admin_state is up # update plugin about port status if admin_state is up
if device_details['admin_state_up']: if device_details['admin_state_up']:
if tap_in_bridge: if interface_plugged:
self.plugin_rpc.update_device_up(self.context, self.plugin_rpc.update_device_up(self.context,
device, device,
self.agent_id, self.agent_id,
@ -1057,6 +1082,7 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
self.ext_manager.handle_port(self.context, device_details) self.ext_manager.handle_port(self.context, device_details)
else: else:
LOG.info(_LI("Device %s not defined on plugin"), device) LOG.info(_LI("Device %s not defined on plugin"), device)
# no resync is needed
return False return False
def treat_devices_removed(self, devices): def treat_devices_removed(self, devices):
@ -1083,19 +1109,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
{'device': device, {'device': device,
'port_id': port_id}) 'port_id': port_id})
if self.prevent_arp_spoofing: if self.prevent_arp_spoofing:
arp_protect.delete_arp_spoofing_protection(devices) self.mgr.delete_arp_spoofing_protection(devices)
return resync return resync
def scan_devices(self, previous, sync): def scan_devices(self, previous, sync):
device_info = {} device_info = {}
# Save and reinitialize the set variable that the port_update RPC uses. updated_devices = self.rpc_callbacks.get_and_clear_updated_devices()
# This should be thread-safe as the greenthread should not yield
# between these two statements.
updated_devices = self.updated_devices
self.updated_devices = set()
current_devices = self.br_mgr.get_tap_devices() current_devices = self.mgr.get_all_devices()
device_info['current'] = current_devices device_info['current'] = current_devices
if previous is None: if previous is None:
@ -1107,7 +1129,7 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
# clear any orphaned ARP spoofing rules (e.g. interface was # clear any orphaned ARP spoofing rules (e.g. interface was
# manually deleted) # manually deleted)
if self.prevent_arp_spoofing: if self.prevent_arp_spoofing:
arp_protect.delete_unreferenced_arp_protection(current_devices) self.mgr.delete_unreferenced_arp_protection(current_devices)
if sync: if sync:
# This is the first iteration, or the previous one had a problem. # This is the first iteration, or the previous one had a problem.
@ -1202,12 +1224,13 @@ def main():
sys.exit(1) sys.exit(1)
LOG.info(_LI("Bridge mappings: %s"), bridge_mappings) LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
manager = LinuxBridgeManager(bridge_mappings, interface_mappings)
polling_interval = cfg.CONF.AGENT.polling_interval polling_interval = cfg.CONF.AGENT.polling_interval
quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
agent = LinuxBridgeNeutronAgentRPC(bridge_mappings, agent = CommonAgentLoop(manager, polling_interval, quitting_rpc_timeout,
interface_mappings, constants.AGENT_TYPE_LINUXBRIDGE,
polling_interval, LB_AGENT_BINARY)
quitting_rpc_timeout)
LOG.info(_LI("Agent initialized successfully, now running... ")) LOG.info(_LI("Agent initialized successfully, now running... "))
launcher = service.launch(cfg.CONF, agent) launcher = service.launch(cfg.CONF, agent)
launcher.wait() launcher.wait()

View File

@ -0,0 +1,48 @@
# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.tests import base
class RPCCallBackImpl(amb.CommonAgentManagerRpcCallBackBase):
def security_groups_rule_updated(self, context, **kwargs):
pass
def security_groups_member_updated(self, context, **kwargs):
pass
def security_groups_provider_updated(self, context, **kwargs):
pass
class Test_CommonAgentManagerRpcCallBackBase(base.BaseTestCase):
def setUp(self):
super(Test_CommonAgentManagerRpcCallBackBase, self).setUp()
self.rpc_callbacks = RPCCallBackImpl(None, None, None)
def test_get_and_clear_updated_devices(self):
updated_devices = ['tap1', 'tap2']
self.rpc_callbacks.updated_devices = updated_devices
self.assertEqual(updated_devices,
self.rpc_callbacks.get_and_clear_updated_devices())
self.assertEqual(set(), self.rpc_callbacks.updated_devices)
def test_add_network(self):
segment = amb.NetworkSegment('vlan', 'physnet1', 100)
network_id = "foo"
self.rpc_callbacks.add_network(network_id, segment)
self.assertEqual(segment, self.rpc_callbacks.network_map[network_id])

View File

@ -24,7 +24,7 @@ from neutron.agent.linux import utils
from neutron.common import constants from neutron.common import constants
from neutron.common import exceptions from neutron.common import exceptions
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \ from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst import constants as lconst
from neutron.plugins.ml2.drivers.linuxbridge.agent \ from neutron.plugins.ml2.drivers.linuxbridge.agent \
@ -106,43 +106,37 @@ class TestLinuxBridge(base.BaseTestCase):
self.assertTrue(vxlan_bridge_func.called) self.assertTrue(vxlan_bridge_func.called)
class TestLinuxBridgeAgent(base.BaseTestCase): class TestCommonAgentLoop(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestLinuxBridgeAgent, self).setUp() super(TestCommonAgentLoop, self).setUp()
# disable setting up periodic state reporting # disable setting up periodic state reporting
cfg.CONF.set_override('report_interval', 0, 'AGENT') cfg.CONF.set_override('report_interval', 0, 'AGENT')
cfg.CONF.set_override('prevent_arp_spoofing', False, 'AGENT') cfg.CONF.set_override('prevent_arp_spoofing', False, 'AGENT')
cfg.CONF.set_default('firewall_driver', cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver', 'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP') group='SECURITYGROUP')
cfg.CONF.set_default('quitting_rpc_timeout', 10, 'AGENT')
cfg.CONF.set_override('local_ip', LOCAL_IP, 'VXLAN') cfg.CONF.set_override('local_ip', LOCAL_IP, 'VXLAN')
self.get_devices_p = mock.patch.object(ip_lib.IPWrapper, 'get_devices')
self.get_devices = self.get_devices_p.start()
self.get_devices.return_value = [ip_lib.IPDevice('eth77')]
self.get_mac_p = mock.patch('neutron.agent.linux.utils.'
'get_interface_mac')
self.get_mac = self.get_mac_p.start()
self.get_mac.return_value = '00:00:00:00:00:01'
self.get_bridge_names_p = mock.patch.object(bridge_lib, self.get_bridge_names_p = mock.patch.object(bridge_lib,
'get_bridge_names') 'get_bridge_names')
self.get_bridge_names = self.get_bridge_names_p.start() self.get_bridge_names = self.get_bridge_names_p.start()
self.get_bridge_names.return_value = ["br-int", "brq1"] self.get_bridge_names.return_value = ["br-int", "brq1"]
with mock.patch.object(ip_lib.IPWrapper,
'get_device_by_ip', manager = mock.Mock()
return_value=FAKE_DEFAULT_DEV): manager.get_all_devices.return_value = []
self.agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC( manager.get_agent_configurations.return_value = {}
{}, {}, 0, cfg.CONF.AGENT.quitting_rpc_timeout) manager.get_rpc_consumers.return_value = []
with mock.patch.object(self.agent, "daemon_loop"),\ with mock.patch.object(linuxbridge_neutron_agent.CommonAgentLoop,
mock.patch.object( '_validate_manager_class'), \
linuxbridge_neutron_agent.LinuxBridgeManager, mock.patch.object(linuxbridge_neutron_agent.CommonAgentLoop,
'check_vxlan_support'): '_validate_rpc_endpoints'):
self.agent = linuxbridge_neutron_agent.CommonAgentLoop(
manager, 0, 10, 'fake_agent', 'foo-binary')
with mock.patch.object(self.agent, "daemon_loop"):
self.agent.start() self.agent.start()
def test_treat_devices_removed_with_existed_device(self): def test_treat_devices_removed_with_existed_device(self):
agent = self.agent agent = self.agent
agent._ensure_port_admin_state = mock.Mock() agent.mgr.ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1] devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA) agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc, with mock.patch.object(agent.plugin_rpc,
@ -220,17 +214,18 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
"remove_devices_filter"): "remove_devices_filter"):
fn_udd.return_value = {'device': DEVICE_1, fn_udd.return_value = {'device': DEVICE_1,
'exists': True} 'exists': True}
with mock.patch.object(arp_protect, with mock.patch.object(agent.mgr,
'delete_arp_spoofing_protection') as de_arp: 'delete_arp_spoofing_protection') as de_arp:
agent.treat_devices_removed(devices) agent.treat_devices_removed(devices)
de_arp.assert_called_with(devices) de_arp.assert_called_with(devices)
def _test_scan_devices(self, previous, updated, def _test_scan_devices(self, previous, updated,
fake_current, expected, sync): fake_current, expected, sync):
self.agent.br_mgr = mock.Mock() self.agent.mgr = mock.Mock()
self.agent.br_mgr.get_tap_devices.return_value = fake_current self.agent.mgr.get_all_devices.return_value = fake_current
self.agent.updated_devices = updated self.agent.rpc_callbacks.get_and_clear_updated_devices.return_value =\
updated
results = self.agent.scan_devices(previous, sync) results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results) self.assertEqual(expected, results)
@ -371,11 +366,10 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
'updated': set(), 'updated': set(),
'added': set([1, 2]), 'added': set([1, 2]),
'removed': set()} 'removed': set()}
with mock.patch.object(arp_protect, self._test_scan_devices(previous, updated, fake_current, expected,
'delete_unreferenced_arp_protection') as de_arp:
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False) sync=False)
de_arp.assert_called_with(fake_current) self.agent.mgr.delete_unreferenced_arp_protection.assert_called_with(
fake_current)
def test_process_network_devices(self): def test_process_network_devices(self):
agent = self.agent agent = self.agent
@ -414,21 +408,29 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
agent.ext_manager = mock.Mock() agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock() agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details] agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.br_mgr = mock.Mock() agent.mgr = mock.Mock()
agent.br_mgr.add_interface.return_value = True agent.mgr.plug_interface.return_value = True
agent._ensure_port_admin_state = mock.Mock() agent.mgr.ensure_port_admin_state = mock.Mock()
resync_needed = agent.treat_devices_added_updated(set(['tap1'])) mock_segment = amb.NetworkSegment(mock_details['network_type'],
mock_details['physical_network'],
mock_details['segmentation_id'])
self.assertFalse(resync_needed) with mock.patch('neutron.plugins.ml2.drivers.agent.'
agent.br_mgr.add_interface.assert_called_with( '_agent_manager_base.NetworkSegment',
'net123', 'vlan', 'physnet1', return_value=mock_segment):
100, 'port123', resync_needed = agent.treat_devices_added_updated(set(['tap1']))
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called) self.assertFalse(resync_needed)
self.assertTrue(agent.ext_manager.handle_port.called) agent.rpc_callbacks.add_network.assert_called_with('net123',
self.assertTrue( mock_segment)
mock_port_data in agent.network_ports[mock_details['network_id']] agent.mgr.plug_interface.assert_called_with(
) 'net123', mock_segment, 'dev123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
self.assertTrue(agent.ext_manager.handle_port.called)
self.assertTrue(mock_port_data in agent.network_ports[
mock_details['network_id']]
)
def test_treat_devices_added_updated_prevent_arp_spoofing_true(self): def test_treat_devices_added_updated_prevent_arp_spoofing_true(self):
agent = self.agent agent = self.agent
@ -441,17 +443,14 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
'segmentation_id': 100, 'segmentation_id': 100,
'physical_network': 'physnet1', 'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX} 'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
tap_name = constants.TAP_DEVICE_PREFIX + mock_details['port_id']
agent.plugin_rpc = mock.Mock() agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details] agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.br_mgr = mock.Mock() agent.mgr = mock.Mock()
agent.br_mgr.add_interface.return_value = True agent.mgr.plug_interface.return_value = True
agent.br_mgr.get_tap_device_name.return_value = tap_name with mock.patch.object(agent.mgr,
agent._ensure_port_admin_state = mock.Mock()
with mock.patch.object(arp_protect,
'setup_arp_spoofing_protection') as set_arp: 'setup_arp_spoofing_protection') as set_arp:
agent.treat_devices_added_updated(set(['tap1'])) agent.treat_devices_added_updated(set(['tap1']))
set_arp.assert_called_with(tap_name, mock_details) set_arp.assert_called_with(mock_details['device'], mock_details)
def test_set_rpc_timeout(self): def test_set_rpc_timeout(self):
self.agent.stop() self.agent.stop()
@ -474,23 +473,6 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.agent._report_state() self.agent._report_state()
self.assertTrue(self.agent.fullsync) self.assertTrue(self.agent.fullsync)
def _test_ensure_port_admin_state(self, admin_state):
port_id = 'fake_id'
with mock.patch.object(ip_lib, 'IPDevice') as dev_mock:
self.agent._ensure_port_admin_state(port_id, admin_state)
tap_name = self.agent.br_mgr.get_tap_device_name(port_id)
self.assertEqual(admin_state,
dev_mock(tap_name).link.set_up.called)
self.assertNotEqual(admin_state,
dev_mock(tap_name).link.set_down.called)
def test_ensure_port_admin_state_up(self):
self._test_ensure_port_admin_state(True)
def test_ensure_port_admin_state_down(self):
self._test_ensure_port_admin_state(False)
def test_update_network_ports(self): def test_update_network_ports(self):
port_1_data = PORT_DATA port_1_data = PORT_DATA
NETWORK_2_ID = 'fake_second_network' NETWORK_2_ID = 'fake_second_network'
@ -983,10 +965,10 @@ class TestLinuxBridgeManager(base.BaseTestCase):
def test_add_tap_interface_owner_neutron(self): def test_add_tap_interface_owner_neutron(self):
self._test_add_tap_interface(constants.DEVICE_OWNER_NEUTRON_PREFIX) self._test_add_tap_interface(constants.DEVICE_OWNER_NEUTRON_PREFIX)
def test_add_interface(self): def test_plug_interface(self):
segment = amb.NetworkSegment(p_const.TYPE_VLAN, "physnet-1", "1")
with mock.patch.object(self.lbm, "add_tap_interface") as add_tap: with mock.patch.object(self.lbm, "add_tap_interface") as add_tap:
self.lbm.add_interface("123", p_const.TYPE_VLAN, "physnet-1", self.lbm.plug_interface("123", segment, "tap234",
"1", "234",
constants.DEVICE_OWNER_NETWORK_PREFIX) constants.DEVICE_OWNER_NETWORK_PREFIX)
add_tap.assert_called_with("123", p_const.TYPE_VLAN, "physnet-1", add_tap.assert_called_with("123", p_const.TYPE_VLAN, "physnet-1",
"1", "tap234", "1", "tap234",
@ -1211,6 +1193,23 @@ class TestLinuxBridgeManager(base.BaseTestCase):
vxlan_group='224.0.0.1', vxlan_group='224.0.0.1',
iproute_arg_supported=True) iproute_arg_supported=True)
def _test_ensure_port_admin_state(self, admin_state):
port_id = 'fake_id'
with mock.patch.object(ip_lib, 'IPDevice') as dev_mock:
self.lbm.ensure_port_admin_state(port_id, admin_state)
tap_name = self.lbm.get_tap_device_name(port_id)
self.assertEqual(admin_state,
dev_mock(tap_name).link.set_up.called)
self.assertNotEqual(admin_state,
dev_mock(tap_name).link.set_down.called)
def test_ensure_port_admin_state_up(self):
self._test_ensure_port_admin_state(True)
def test_ensure_port_admin_state_down(self):
self._test_ensure_port_admin_state(False)
class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
def setUp(self): def setUp(self):
@ -1219,15 +1218,10 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
class FakeLBAgent(object): class FakeLBAgent(object):
def __init__(self): def __init__(self):
self.agent_id = 1 self.agent_id = 1
self.br_mgr = get_linuxbridge_manager( self.mgr = get_linuxbridge_manager(
BRIDGE_MAPPINGS, INTERFACE_MAPPINGS) BRIDGE_MAPPINGS, INTERFACE_MAPPINGS)
self.br_mgr.vxlan_mode = lconst.VXLAN_UCAST self.mgr.vxlan_mode = lconst.VXLAN_UCAST
segment = mock.Mock()
segment.network_type = 'vxlan'
segment.segmentation_id = 1
self.br_mgr.network_map['net_id'] = segment
self.updated_devices = set()
self.network_ports = collections.defaultdict(list) self.network_ports = collections.defaultdict(list)
self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks( self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks(
@ -1236,15 +1230,20 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
object() object()
) )
segment = mock.Mock()
segment.network_type = 'vxlan'
segment.segmentation_id = 1
self.lb_rpc.network_map['net_id'] = segment
def test_network_delete(self): def test_network_delete(self):
mock_net = mock.Mock() mock_net = mock.Mock()
mock_net.physical_network = None mock_net.physical_network = None
self.lb_rpc.agent.br_mgr.network_map = {NETWORK_ID: mock_net} self.lb_rpc.network_map = {NETWORK_ID: mock_net}
with mock.patch.object(self.lb_rpc.agent.br_mgr, with mock.patch.object(self.lb_rpc.agent.mgr,
"get_bridge_name") as get_br_fn,\ "get_bridge_name") as get_br_fn,\
mock.patch.object(self.lb_rpc.agent.br_mgr, mock.patch.object(self.lb_rpc.agent.mgr,
"delete_bridge") as del_fn: "delete_bridge") as del_fn:
get_br_fn.return_value = "br0" get_br_fn.return_value = "br0"
self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID) self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID)
@ -1254,7 +1253,7 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
def test_port_update(self): def test_port_update(self):
port = {'id': PORT_1} port = {'id': PORT_1}
self.lb_rpc.port_update(context=None, port=port) self.lb_rpc.port_update(context=None, port=port)
self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices) self.assertEqual(set([DEVICE_1]), self.lb_rpc.updated_devices)
def test_network_update(self): def test_network_update(self):
updated_network = {'id': NETWORK_ID} updated_network = {'id': NETWORK_ID}
@ -1262,16 +1261,16 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
NETWORK_ID: [PORT_DATA] NETWORK_ID: [PORT_DATA]
} }
self.lb_rpc.network_update(context=None, network=updated_network) self.lb_rpc.network_update(context=None, network=updated_network)
self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices) self.assertEqual(set([DEVICE_1]), self.lb_rpc.updated_devices)
def test_network_delete_with_existed_brq(self): def test_network_delete_with_existed_brq(self):
mock_net = mock.Mock() mock_net = mock.Mock()
mock_net.physical_network = 'physnet0' mock_net.physical_network = 'physnet0'
self.lb_rpc.agent.br_mgr.network_map = {'123': mock_net} self.lb_rpc.network_map = {'123': mock_net}
with mock.patch.object(linuxbridge_neutron_agent.LOG, 'info') as log,\ with mock.patch.object(linuxbridge_neutron_agent.LOG, 'info') as log,\
mock.patch.object(self.lb_rpc.agent.br_mgr, mock.patch.object(self.lb_rpc.agent.mgr,
"delete_bridge") as del_fn: "delete_bridge") as del_fn:
self.lb_rpc.network_delete("anycontext", network_id="123") self.lb_rpc.network_delete("anycontext", network_id="123")
self.assertEqual(0, del_fn.call_count) self.assertEqual(0, del_fn.call_count)