Merge "ovs-agent: Seperate VLAN mapping outside of the agent"

This commit is contained in:
Jenkins 2016-08-12 04:50:46 +00:00 committed by Gerrit Code Review
commit 739bd92bb0
10 changed files with 439 additions and 185 deletions

View File

@ -90,7 +90,6 @@ class SecurityGroupAgentRpc(object):
self.context = context
self.plugin_rpc = plugin_rpc
self.init_firewall(defer_refresh_firewall, integration_bridge)
self.local_vlan_map = local_vlan_map
def init_firewall(self, defer_refresh_firewall=False,
integration_bridge=None):

View File

@ -22,6 +22,7 @@ from oslo_log import helpers as log_helpers
import six
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
@six.add_metaclass(abc.ABCMeta)
@ -222,19 +223,40 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
'''
pass
def get_agent_ports(self, fdb_entries, local_vlan_map):
def _get_lvm_getter(self, local_vlan_map):
def get_lvm_from_mapping(net_id, local_vlan_map):
"""This introduces backward compatibility with local_vlan_map, will
be removed in Ocata.
"""
try:
return local_vlan_map[net_id]
except KeyError:
raise vlanmanager.MappingNotFound(net_id=net_id)
def get_lvm_from_manager(net_id, local_vlan_map):
vlan_manager = vlanmanager.LocalVlanManager()
return vlan_manager.get(net_id)
if local_vlan_map is not None:
vlanmanager.deprecate_local_vlan_map_in_object(
"%s.get_agent_ports()" % self.__class__.__name__)
return get_lvm_from_mapping
return get_lvm_from_manager
def get_agent_ports(self, fdb_entries, local_vlan_map=None):
"""Generator to yield port info.
For each known (i.e found in local_vlan_map) network in
For each known (i.e found in VLAN manager) network in
fdb_entries, yield (lvm, fdb_entries[network_id]['ports']) pair.
:param fdb_entries: l2pop fdb entries
:param local_vlan_map: A dict to map network_id to
the corresponding lvm entry.
:param local_vlan_map: Deprecated.
"""
lvm_getter = self._get_lvm_getter(local_vlan_map)
for network_id, values in fdb_entries.items():
lvm = local_vlan_map.get(network_id)
if lvm is None:
try:
lvm = lvm_getter(network_id, local_vlan_map)
except vlanmanager.MappingNotFound:
continue
agent_ports = values.get('ports')
yield (lvm, agent_ports)
@ -281,7 +303,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
@log_helpers.log_method_call
def fdb_chg_ip_tun(self, context, br, fdb_entries, local_ip,
local_vlan_map):
local_vlan_map=None):
'''fdb update when an IP of a port is updated.
The ML2 l2-pop mechanism driver sends an fdb update rpc message when an
@ -305,13 +327,13 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
PortInfo has .mac_address and .ip_address attrs.
:param local_ip: local IP address of this agent.
:param local_vlan_map: A dict to map network_id to
the corresponding lvm entry.
:param local_vlan_map: Deprecated.
'''
lvm_getter = self._get_lvm_getter(local_vlan_map)
for network_id, agent_ports in fdb_entries.items():
lvm = local_vlan_map.get(network_id)
if not lvm:
try:
lvm = lvm_getter(network_id, local_vlan_map)
except vlanmanager.MappingNotFound:
continue
for agent_ip, state in agent_ports.items():

View File

@ -21,6 +21,7 @@ import signal
import sys
import time
import debtcollector
import netaddr
from neutron_lib import constants as n_const
from oslo_config import cfg
@ -58,6 +59,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_agent_extension_api as ovs_ext_api
from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_dvr_neutron_agent
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
LOG = logging.getLogger(__name__)
@ -66,31 +68,15 @@ cfg.CONF.import_group('AGENT', 'neutron.plugins.ml2.drivers.openvswitch.'
cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
'common.config')
LocalVLANMapping = debtcollector.moves.moved_class(
vlanmanager.LocalVLANMapping, 'LocalVLANMapping', __name__,
version='Newton', removal_version='Ocata')
class _mac_mydialect(netaddr.mac_unix):
word_fmt = '%.2x'
class LocalVLANMapping(object):
def __init__(self, vlan, network_type, physical_network, segmentation_id,
vif_ports=None):
if vif_ports is None:
vif_ports = {}
self.vlan = vlan
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
self.vif_ports = vif_ports
# set of tunnel ports on which packets should be flooded
self.tun_ofports = set()
def __str__(self):
return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
(self.vlan, self.network_type, self.physical_network,
self.segmentation_id))
class OVSPluginApi(agent_rpc.PluginApi):
pass
@ -192,7 +178,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
self.vlan_manager = vlanmanager.LocalVlanManager()
self._reset_tunnel_ofports()
@ -246,9 +232,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self._restore_local_vlan_map()
# Security group agent support
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, self.local_vlan_map,
defer_refresh_firewall=True, integration_bridge=self.int_br)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
integration_bridge=self.int_br)
# we default to False to provide backward compat with out of tree
# firewall drivers that expect the logic that existed on the Neutron
@ -304,6 +290,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout
@debtcollector.removals.removed_property(
version='Newton', removal_version='Ocata')
def local_vlan_map(self):
"""Provide backward compatibility with local_vlan_map attribute"""
return self.vlan_manager.mapping
def _parse_bridge_mappings(self, bridge_mappings):
try:
return n_utils.parse_mappings(bridge_mappings)
@ -404,10 +396,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
connection, constants.EXTENSION_DRIVER_TYPE,
self.agent_api)
@debtcollector.moves.moved_method(
'get_net_uuid',
'OVSNeutronAgent.get_net_uuid() moved to vlanmanager.LocalVlanManager',
removal_version='Ocata')
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
if vif_id in vlan_mapping.vif_ports:
return network_id
try:
return self.vlan_manager.get_net_uuid(vif_id)
except vlanmanager.VifIdNotFound:
pass
def port_update(self, context, **kwargs):
port = kwargs.get('port')
@ -512,8 +509,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
self.local_vlan_map):
for lvm, agent_ports in self.get_agent_ports(fdb_entries):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
@ -526,8 +522,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def fdb_remove(self, context, fdb_entries):
LOG.debug("fdb_remove received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
self.local_vlan_map):
for lvm, agent_ports in self.get_agent_ports(fdb_entries):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
@ -575,7 +570,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug("update chg_ip received")
with self.tun_br.deferred() as deferred_br:
self.fdb_chg_ip_tun(context, deferred_br, fdb_entries,
self.local_ip, self.local_vlan_map)
self.local_ip)
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
@ -639,10 +634,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# On a restart or crash of OVS, the network associated with this VLAN
# will already be assigned, so check for that here before assigning a
# new one.
lvm = self.local_vlan_map.get(net_uuid)
if lvm:
try:
lvm = self.vlan_manager.get(net_uuid)
lvid = lvm.vlan
else:
except vlanmanager.MappingNotFound:
lvid = self._local_vlan_hints.pop(net_uuid, None)
if lvid is None:
if not self.available_local_vlans:
@ -650,10 +645,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
net_uuid)
return
lvid = self.available_local_vlans.pop()
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
network_type,
physical_network,
segmentation_id)
self.vlan_manager.add(
net_uuid, lvid, network_type, physical_network,
segmentation_id)
LOG.info(_LI("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
@ -714,8 +708,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param net_uuid: the network uuid associated with this vlan.
'''
lvm = self.local_vlan_map.pop(net_uuid, None)
if lvm is None:
try:
lvm = vlanmanager.LocalVlanManager().pop(net_uuid)
except KeyError:
LOG.debug("Network %s not used on agent.", net_uuid)
return
@ -787,10 +782,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param device_owner: the string indicative of owner of this port
:param ovs_restarted: indicates if this is called for an OVS restart.
'''
if net_uuid not in self.local_vlan_map or ovs_restarted:
if net_uuid not in self.vlan_manager or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
lvm = self.vlan_manager.get(net_uuid)
lvm.vif_ports[port.vif_id] = port
self.dvr_agent.bind_port_to_dvr(port, lvm,
@ -825,8 +820,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
info_by_port = {x['name']: [x['tag'], x['other_config']]
for x in port_info}
for port_detail in need_binding_ports:
lvm = self.local_vlan_map.get(port_detail['network_id'])
if not lvm:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
continue
port = port_detail['vif_port']
cur_info = info_by_port.get(port.port_name)
@ -845,8 +841,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Port", columns=["name", "tag"], ports=port_names, if_exists=True)
tags_by_name = {x['name']: x['tag'] for x in port_info}
for port_detail in need_binding_ports:
lvm = self.local_vlan_map.get(port_detail['network_id'])
if not lvm:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
# network for port was deleted. skip this port since it
# will need to be handled as a DEAD port in the next scan
continue
@ -954,15 +951,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param vif_id: the id of the vif
:param net_uuid: the net_uuid this port is associated with.
'''
if net_uuid is None:
net_uuid = self.get_net_uuid(vif_id)
if not self.local_vlan_map.get(net_uuid):
LOG.info(_LI('port_unbound(): net_uuid %s not in local_vlan_map'),
net_uuid)
try:
net_uuid = net_uuid or self.vlan_manager.get_net_uuid(vif_id)
except vlanmanager.VifIdNotFound:
LOG.info(
_LI('port_unbound(): net_uuid %s not managed by VLAN manager'),
net_uuid)
return
lvm = self.local_vlan_map[net_uuid]
lvm = self.vlan_manager.get(net_uuid)
if vif_id in lvm.vif_ports:
vif_port = lvm.vif_ports[vif_id]
@ -1375,7 +1372,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"""
port_tags = self.int_br.get_port_tag_dict()
changed_ports = set()
for lvm in self.local_vlan_map.values():
for lvm in self.vlan_manager:
for port in lvm.vif_ports.values():
if (
port.port_name in port_tags
@ -1451,7 +1448,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ofports = self.tun_br_ofports[tunnel_type].values()
if ofports and not self.l2_pop:
# Update flooding flows to include the new tunnel
for vlan_mapping in list(self.local_vlan_map.values()):
for vlan_mapping in self.vlan_manager:
if vlan_mapping.network_type == tunnel_type:
br.install_flood_to_tun(vlan_mapping.vlan,
vlan_mapping.segmentation_id,
@ -1471,7 +1468,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
# Check if this tunnel port is still used
for lvm in self.local_vlan_map.values():
for lvm in self.vlan_manager:
if tun_ofport in lvm.tun_ofports:
break
# If not, remove it

View File

@ -0,0 +1,117 @@
# Copyright 2016 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import debtcollector
from neutron_lib import exceptions
from neutron._i18n import _
def deprecate_local_vlan_map_in_object(object_name):
debtcollector.deprecate(
"local_vlan_map argument for %s was deprecated." % object_name,
version="Newton", removal_version="Ocata")
class VifIdNotFound(exceptions.NeutronException):
message = _('VIF ID %(vif_id)s not found in any network managed by '
'VLAN Manager')
class MappingAlreadyExists(exceptions.NeutronException):
message = _('VLAN mapping for network with id %(net_id)s already exists')
class MappingNotFound(exceptions.NeutronException):
message = _('Mapping for network %(net_id)s not found.')
class LocalVLANMapping(object):
def __init__(self, vlan, network_type, physical_network, segmentation_id,
vif_ports=None):
self.vlan = vlan
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
self.vif_ports = vif_ports or {}
# set of tunnel ports on which packets should be flooded
self.tun_ofports = set()
def __str__(self):
return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
(self.vlan, self.network_type, self.physical_network,
self.segmentation_id))
def __eq__(self, other):
return all(hasattr(other, a) and getattr(self, a) == getattr(other, a)
for a in ['vlan',
'network_type',
'physical_network',
'segmentation_id',
'vif_ports'])
def __hash__(self):
return id(self)
class LocalVlanManager(object):
"""Singleton manager that maps internal VLAN mapping to external network
segmentation ids.
"""
def __new__(cls):
if not hasattr(cls, '_instance'):
cls._instance = super(LocalVlanManager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'mapping'):
self.mapping = {}
def __contains__(self, key):
return key in self.mapping
def __iter__(self):
for value in list(self.mapping.values()):
yield value
def items(self):
for item in self.mapping.items():
yield item
def add(self, net_id, vlan, network_type, physical_network,
segmentation_id, vif_ports=None):
if net_id in self.mapping:
raise MappingAlreadyExists(net_id=net_id)
self.mapping[net_id] = LocalVLANMapping(
vlan, network_type, physical_network, segmentation_id, vif_ports)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.mapping.items():
if vif_id in vlan_mapping.vif_ports:
return network_id
raise VifIdNotFound(vif_id=vif_id)
def get(self, net_id):
try:
return self.mapping[net_id]
except KeyError:
raise MappingNotFound(net_id=net_id)
def pop(self, net_id):
try:
return self.mapping.pop(net_id)
except KeyError:
raise MappingNotFound(net_id=net_id)

View File

@ -35,7 +35,6 @@ from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
from neutron.tests import base
from neutron.tests import tools
from neutron.tests.unit.extensions import test_securitygroup as test_sg
@ -3121,12 +3120,8 @@ class TestSecurityGroupAgentWithOVSIptables(
test_rpc_v1_1)
def _init_agent(self, defer_refresh_firewall):
fake_map = ovs_neutron_agent.LocalVLANMapping(1, 'network_type',
'physical_network', 1)
local_vlan_map = {'fakenet': fake_map}
self.agent = sg_rpc.SecurityGroupAgentRpc(
context=None, plugin_rpc=self.rpc,
local_vlan_map=local_vlan_map,
defer_refresh_firewall=defer_refresh_firewall)
self._enforce_order_in_firewall(self.agent.firewall)

View File

@ -20,8 +20,9 @@ import mock
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
from neutron.tests import base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent import \
test_vlanmanager
class FakeNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin):
@ -53,6 +54,8 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
def setUp(self):
super(TestL2populationRpcCallBackTunnelMixinBase, self).setUp()
self.vlan_manager = self.useFixture(
test_vlanmanager.LocalVlanManagerFixture()).manager
self.fakeagent = FakeNeutronAgent()
self.fakebr = mock.Mock()
Port = collections.namedtuple('Port', 'ip, ofport')
@ -112,21 +115,13 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
},
}
self.lvm1 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[0].vlan, self.type_gre, self.lvms[0].phys,
self.lvms[0].segid, {self.lvms[0].vif: self.lvms[0].port})
self.lvm2 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[1].vlan, self.type_gre, self.lvms[1].phys,
self.lvms[1].segid, {self.lvms[1].vif: self.lvms[1].port})
self.lvm3 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[2].vlan, self.type_gre, self.lvms[2].phys,
self.lvms[2].segid, {self.lvms[2].vif: self.lvms[2].port})
self.local_vlan_map1 = {
self.lvms[0].net: self.lvm1,
self.lvms[1].net: self.lvm2,
self.lvms[2].net: self.lvm3,
}
for i in range(3):
self.vlan_manager.add(
self.lvms[i].net,
self.lvms[i].vlan, self.type_gre, self.lvms[i].phys,
self.lvms[i].segid, {self.lvms[i].vif: self.lvms[i].port})
setattr(self, 'lvm%d' % i,
self.vlan_manager.get(self.lvms[i].net))
self.upd_fdb_entry1_val = {
self.lvms[0].net: {

View File

@ -25,20 +25,22 @@ class TestL2populationRpcCallBackTunnelMixin(
l2population_rpc_base.TestL2populationRpcCallBackTunnelMixinBase):
def test_get_agent_ports_no_data(self):
# Make sure vlan manager has no mappings that were added in setUp()
self.vlan_manager.mapping = {}
self.assertFalse(
list(self.fakeagent.get_agent_ports(self.fdb_entries1, {})))
list(self.fakeagent.get_agent_ports(self.fdb_entries1)))
def test_get_agent_ports_non_existence_key_in_lvm(self):
results = {}
del self.local_vlan_map1[self.lvms[1].net]
self.vlan_manager.pop(self.lvms[1].net)
for lvm, agent_ports in self.fakeagent.get_agent_ports(
self.fdb_entries1, self.local_vlan_map1):
self.fdb_entries1):
results[lvm] = agent_ports
expected = {
self.lvm1: {
self.lvm0: {
self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
self.local_ip: []},
self.lvm3: {
self.lvm2: {
self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
self.local_ip: []},
}
@ -48,14 +50,14 @@ class TestL2populationRpcCallBackTunnelMixin(
results = {}
self.fdb_entries1[self.lvms[1].net]['ports'] = {}
for lvm, agent_ports in self.fakeagent.get_agent_ports(
self.fdb_entries1, self.local_vlan_map1):
self.fdb_entries1):
results[lvm] = agent_ports
expected = {
self.lvm1: {
self.lvm0: {
self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
self.local_ip: []},
self.lvm2: {},
self.lvm3: {
self.lvm1: {},
self.lvm2: {
self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
self.local_ip: []},
}
@ -65,40 +67,40 @@ class TestL2populationRpcCallBackTunnelMixin(
with mock.patch.object(self.fakeagent, 'setup_tunnel_port'),\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
def test_fdb_add_tun_non_existence_key_in_ofports(self):
ofport = self.lvm1.network_type + '0a0a0a0a'
ofport = self.lvm0.network_type + '0a0a0a0a'
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
return_value=ofport
) as mock_setup_tunnel_port,\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
mock_setup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm0.network_type)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, ofport),
self.ports[1].ip, self.lvm0, ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -110,16 +112,16 @@ class TestL2populationRpcCallBackTunnelMixin(
) as mock_setup_tunnel_port,\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
mock_setup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm0.network_type)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -127,16 +129,16 @@ class TestL2populationRpcCallBackTunnelMixin(
def test_fdb_remove_tun(self):
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -147,35 +149,35 @@ class TestL2populationRpcCallBackTunnelMixin(
) as mock_del_fdb_flow,\
mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'
) as mock_cleanup_tunnel_port:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr,
(n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
mock_cleanup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ofport, self.lvm1.network_type)
self.fakebr, self.ports[1].ofport, self.lvm0.network_type)
def test_fdb_remove_tun_non_existence_key_in_ofports(self):
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -193,23 +195,24 @@ class TestL2populationRpcCallBackTunnelMixin(
'context', self.upd_fdb_entry1)
def test__fdb_chg_ip(self):
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
self.upd_fdb_entry1_val, self.local_ip,
self.local_vlan_map1)
with mock.patch.object(
self.fakeagent,
'setup_entry_for_arp_reply') as m_setup_entry_for_arp_reply:
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
self.upd_fdb_entry1_val,
self.local_ip)
expected = [
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
mock.call(self.fakebr, 'remove', self.lvm0.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
mock.call(self.fakebr, 'add', self.lvm0.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm0.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm0.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm2.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm2.vlan, self.lvms[2].mac,
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[2].mac,
self.lvms[2].ip),
]
m_setup_entry_for_arp_reply.assert_has_calls(expected, any_order=True)
@ -233,8 +236,7 @@ class TestL2populationRpcCallBackTunnelMixin(
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
upd_fdb_entry_val, self.local_ip,
self.local_vlan_map1)
upd_fdb_entry_val, self.local_ip)
self.assertFalse(m_setup_entry_for_arp_reply.call_count)
def test_fdb_chg_ip_tun_empty_before_after(self):
@ -247,6 +249,5 @@ class TestL2populationRpcCallBackTunnelMixin(
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
# passing non-local ip
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
upd_fdb_entry_val, "8.8.8.8",
self.local_vlan_map1)
upd_fdb_entry_val, "8.8.8.8")
self.assertFalse(m_setup_entry_for_arp_reply.call_count)

View File

@ -36,6 +36,8 @@ from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
from neutron.tests import base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import test_vlanmanager
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
@ -97,6 +99,7 @@ class TestOvsNeutronAgent(object):
def setUp(self):
super(TestOvsNeutronAgent, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
@ -148,9 +151,8 @@ class TestOvsNeutronAgent(object):
fixed_ips = [{'subnet_id': 'my-subnet-uuid',
'ip_address': '1.1.1.1'}]
if old_local_vlan is not None:
self.agent.local_vlan_map[net_uuid] = (
self.mod_agent.LocalVLANMapping(
old_local_vlan, None, None, None))
self.agent.vlan_manager.add(
net_uuid, old_local_vlan, None, None, None)
with mock.patch.object(self.agent, 'int_br', autospec=True) as int_br:
int_br.db_get_val.return_value = db_get_val
int_br.set_db_attribute.return_value = True
@ -603,9 +605,8 @@ class TestOvsNeutronAgent(object):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
port = ovs_lib.VifPort(1, 1, 1, mac, br)
lvm = self.mod_agent.LocalVLANMapping(
1, '1', None, 1, {port.vif_id: port})
local_vlan_map = {'1': lvm}
self.agent.vlan_manager.add(
'1', 1, '1', None, 1, {port.vif_id: port})
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
port_tags_dict = {1: []}
@ -613,8 +614,7 @@ class TestOvsNeutronAgent(object):
added=set([3]), current=vif_port_set,
removed=set([2]), updated=set([1])
)
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map),\
mock.patch.object(self.agent, 'tun_br', autospec=True):
with mock.patch.object(self.agent, 'tun_br', autospec=True):
actual = self.mock_scan_ports(
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
@ -646,8 +646,9 @@ class TestOvsNeutronAgent(object):
new_failed_devices_retries_map)
def test_add_port_tag_info(self):
self.agent.local_vlan_map["net1"] = mock.Mock()
self.agent.local_vlan_map["net1"].vlan = "1"
lvm = mock.Mock()
lvm.vlan = "1"
self.agent.vlan_manager.mapping["net1"] = lvm
ovs_db_list = [{'name': 'tap1',
'tag': [],
'other_config': {'segmentation_id': '1'}},
@ -682,7 +683,7 @@ class TestOvsNeutronAgent(object):
def test_bind_devices(self):
devices_up = ['tap1']
devices_down = ['tap2']
self.agent.local_vlan_map["net1"] = mock.Mock()
self.agent.vlan_manager.mapping["net1"] = mock.Mock()
ovs_db_list = [{'name': 'tap1', 'tag': []},
{'name': 'tap2', 'tag': []}]
vif_port1 = mock.Mock()
@ -714,8 +715,7 @@ class TestOvsNeutronAgent(object):
self.agent.prevent_arp_spoofing = enable_prevent_arp_spoofing
ovs_db_list = [{'name': 'fake_device', 'tag': []}]
self.agent.local_vlan_map = {
'fake_network': ovs_agent.LocalVLANMapping(1, None, None, 1)}
self.agent.vlan_manager.add('fake_network', 1, None, None, 1)
vif_port = mock.Mock()
vif_port.port_name = 'fake_device'
vif_port.ofport = 1
@ -1423,7 +1423,7 @@ class TestOvsNeutronAgent(object):
lvm = mock.Mock()
lvm.network_type = "gre"
lvm.vif_ports = {"vif1": mock.Mock()}
self.agent.local_vlan_map["netuid12345"] = lvm
self.agent.vlan_manager.mapping["netuid12345"] = lvm
self.agent.port_unbound("vif1", "netuid12345")
self.assertTrue(reclvl_fn.called)
@ -1446,7 +1446,7 @@ class TestOvsNeutronAgent(object):
lvm2.vlan = 'vlan2'
lvm2.segmentation_id = 'seg2'
lvm2.tun_ofports = set(['1', '2'])
self.agent.local_vlan_map = {'net1': lvm1, 'net2': lvm2}
self.agent.vlan_manager.mapping = {'net1': lvm1, 'net2': lvm2}
self.agent.tun_br_ofports = {'gre':
{'1.1.1.1': '1', '2.2.2.2': '2'}}
self.agent.arp_responder_enabled = True
@ -2050,19 +2050,17 @@ class TestOvsNeutronAgent(object):
"""
def add_new_vlan_mapping(*args, **kwargs):
self.agent.local_vlan_map['bar'] = (
self.mod_agent.LocalVLANMapping(1, 2, 3, 4))
self.agent.vlan_manager.add('bar', 1, 2, 3, 4)
bridge = mock.Mock()
tunnel_type = 'vxlan'
self.agent.tun_br_ofports = {tunnel_type: dict()}
self.agent.l2_pop = False
self.agent.local_vlan_map = {
'foo': self.mod_agent.LocalVLANMapping(4, tunnel_type, 2, 1)}
self.agent.vlan_manager.add('foo', 4, tunnel_type, 2, 1)
self.agent.local_ip = '2.3.4.5'
bridge.install_flood_to_tun.side_effect = add_new_vlan_mapping
self.agent._setup_tunnel_port(bridge, 1, '1.2.3.4',
tunnel_type=tunnel_type)
self.assertIn('bar', self.agent.local_vlan_map)
self.assertIn('bar', self.agent.vlan_manager)
def test_setup_entry_for_arp_reply_ignores_ipv6_addresses(self):
self.agent.arp_responder_enabled = True
@ -2296,7 +2294,8 @@ class TestOvsDvrNeutronAgent(object):
self.agent.patch_tun_ofport = 1
self.agent.patch_int_ofport = 2
self.agent.dvr_agent.local_ports = {}
self.agent.local_vlan_map = {}
self.agent.vlan_manager = self.useFixture(
test_vlanmanager.LocalVlanManagerFixture()).manager
self.agent.dvr_agent.enable_distributed_routing = True
self.agent.dvr_agent.enable_tunneling = True
self.agent.dvr_agent.patch_tun_ofport = 1
@ -2397,7 +2396,7 @@ class TestOvsDvrNeutronAgent(object):
n_const.DEVICE_OWNER_DVR_INTERFACE, False)
phy_ofp = self.agent.dvr_agent.phys_ofports[physical_network]
int_ofp = self.agent.dvr_agent.int_ofports[physical_network]
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_phys_br = [
mock.call.provision_local_vlan(
port=phy_ofp,
@ -2485,7 +2484,7 @@ class TestOvsDvrNeutronAgent(object):
self._port, self._net_uuid, network_type,
physical_network, segmentation_id, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE, False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = self._expected_port_bound(
self._port, lvid)
expected_on_tun_br = [
@ -2567,7 +2566,7 @@ class TestOvsDvrNeutronAgent(object):
def test_port_bound_for_dvr_with_csnat_ports(self):
self._setup_for_dvr_test()
int_br, tun_br = self._port_bound_for_dvr_with_csnat_ports()
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.install_dvr_to_src_mac(
network_type='vxlan',
@ -2594,7 +2593,7 @@ class TestOvsDvrNeutronAgent(object):
# simulate a replug
self._port.ofport = 12
int_br, tun_br = self._port_bound_for_dvr_with_csnat_ports()
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.delete_dvr_to_src_mac(
network_type='vxlan',
@ -2715,7 +2714,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
self.assertEqual(self._expected_port_bound(self._port, lvid),
int_br.mock_calls)
expected_on_tun_br = [
@ -2750,7 +2749,7 @@ class TestOvsDvrNeutronAgent(object):
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed(
[self._port.vif_id])
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
if ip_version == 4:
expected = [
mock.call.delete_dvr_process_ipv4(
@ -2812,7 +2811,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
self.assertEqual(
self._expected_port_bound(self._port, lvid),
int_br.mock_calls)
@ -2927,7 +2926,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_ROUTER_SNAT,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.install_dvr_to_src_mac(
network_type='vxlan',

View File

@ -28,6 +28,8 @@ from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import test_vlanmanager
def nonzero(f):
@ -74,6 +76,7 @@ class TunnelTest(object):
def setUp(self):
super(TunnelTest, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
conn_patcher = mock.patch(
'neutron.agent.ovsdb.native.connection.Connection.start')
conn_patcher.start()
@ -93,11 +96,11 @@ class TunnelTest(object):
self.MAP_TUN_INT_OFPORT = 33333
self.MAP_TUN_PHY_OFPORT = 44444
self.LVM = self.mod_agent.LocalVLANMapping(
self.LVM_DATA = (
LV_ID, 'gre', None, LS_ID, VIF_PORTS)
self.LVM_FLAT = self.mod_agent.LocalVLANMapping(
self.LVM_FLAT_DATA = (
LV_ID, 'flat', 'net1', LS_ID, VIF_PORTS)
self.LVM_VLAN = self.mod_agent.LocalVLANMapping(
self.LVM_VLAN_DATA = (
LV_ID, 'vlan', 'net1', LS_ID, VIF_PORTS)
self.inta = mock.Mock()
@ -420,16 +423,16 @@ class TunnelTest(object):
a = self._build_agent()
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM.vlan, a.available_local_vlans)
self.assertIn(self.LVM_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_reclaim_local_vlan_flat(self):
self.mock_map_tun_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.MAP_TUN_PHY_OFPORT,
lvid=self.LVM_FLAT.vlan))
lvid=self.LVM_FLAT_DATA[0]))
self.mock_int_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.INT_OFPORT,
@ -440,16 +443,16 @@ class TunnelTest(object):
a.int_ofports['net1'] = self.INT_OFPORT
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM_FLAT
a.vlan_manager.add(NET_UUID, *self.LVM_FLAT_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM_FLAT.vlan, a.available_local_vlans)
self.assertIn(self.LVM_FLAT_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_reclaim_local_vlan_vlan(self):
self.mock_map_tun_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.MAP_TUN_PHY_OFPORT,
lvid=self.LVM_VLAN.vlan))
lvid=self.LVM_VLAN_DATA[0]))
self.mock_int_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.INT_OFPORT,
@ -460,9 +463,9 @@ class TunnelTest(object):
a.int_ofports['net1'] = self.INT_OFPORT
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM_VLAN
a.vlan_manager.add(NET_UUID, *self.LVM_VLAN_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM_VLAN.vlan, a.available_local_vlans)
self.assertIn(self.LVM_VLAN_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_port_bound(self):
@ -477,7 +480,7 @@ class TunnelTest(object):
vlan_mapping)]
a = self._build_agent()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.local_dvr_map = {}
self.ovs_bridges[self.INT_BRIDGE].db_get_val.return_value = {}
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID,
@ -488,7 +491,7 @@ class TunnelTest(object):
with mock.patch.object(self.mod_agent.OVSNeutronAgent,
'reclaim_local_vlan') as reclaim_local_vlan:
a = self._build_agent()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.port_unbound(VIF_ID, NET_UUID)
reclaim_local_vlan.assert_called_once_with(NET_UUID)
@ -507,7 +510,7 @@ class TunnelTest(object):
a = self._build_agent()
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
self.ovs_bridges[self.INT_BRIDGE].db_get_val.return_value = mock.Mock()
a.port_dead(VIF_PORT)
self._verify_mock_calls()

View File

@ -0,0 +1,126 @@
# Copyright 2016 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import testtools
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
from neutron.tests import base
class LocalVlanManagerFixture(fixtures.Fixture):
def _setUp(self):
super(LocalVlanManagerFixture, self)._setUp()
self.vlan_manager = vlanmanager.LocalVlanManager()
self.addCleanup(self.restore_manager)
# Remove _instance attribute from VlanManager in order to not obtain a
# singleton
del vlanmanager.LocalVlanManager._instance
self.manager = vlanmanager.LocalVlanManager()
def restore_manager(self):
vlanmanager.LocalVlanManager._instance = self.vlan_manager
class TestLocalVLANMapping(base.BaseTestCase):
def test___eq___equal(self):
mapping1 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
mapping2 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
self.assertEqual(mapping1, mapping2)
def test___eq___different(self):
mapping1 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
mapping2 = vlanmanager.LocalVLANMapping(1, 2, 4, 4, 5)
self.assertNotEqual(mapping1, mapping2)
def test___eq___different_type(self):
mapping = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
self.assertNotEqual(mapping, "foo")
class TestLocalVlanManager(base.BaseTestCase):
def setUp(self):
super(TestLocalVlanManager, self).setUp()
self.vlan_manager = self.useFixture(LocalVlanManagerFixture()).manager
def test_is_singleton(self):
self.vlan_manager.add(1, None, None, None, None)
new_vlan_manager = vlanmanager.LocalVlanManager()
self.assertIs(new_vlan_manager, self.vlan_manager)
self.assertItemsEqual(new_vlan_manager.mapping,
self.vlan_manager.mapping)
def test_in_operator_on_key(self):
self.vlan_manager.add(1, None, None, None, None)
self.assertIn(1, self.vlan_manager)
self.assertNotIn(2, self.vlan_manager)
def test_iterator_returns_vlan_mappings(self):
created_vlans = []
for val in range(3):
self.vlan_manager.add(val, val, val, val, val)
created_vlans.append(self.vlan_manager.get(val))
self.assertItemsEqual(created_vlans, list(self.vlan_manager))
def test_get_net_uuid_existing(self):
port_id = 'port-id'
vlan_data = (2, 3, 4, 5, {port_id: 'port'})
net_id = 1
self.vlan_manager.add(net_id, *vlan_data)
obtained_net_id = self.vlan_manager.get_net_uuid(port_id)
self.assertEqual(net_id, obtained_net_id)
def test_get_net_uuid_non_existing_raises_exception(self):
vlan_data = (1, 2, 3, 4, 5, {'port_id': 'port'})
self.vlan_manager.add(*vlan_data)
with testtools.ExpectedException(vlanmanager.VifIdNotFound):
self.vlan_manager.get_net_uuid('non-existing-port')
def test_add_and_get(self):
vlan_data = (2, 3, 4, 5, 6)
expected_vlan_mapping = vlanmanager.LocalVLANMapping(*vlan_data)
self.vlan_manager.add(1, *vlan_data)
vlan_mapping = self.vlan_manager.get(1)
self.assertEqual(expected_vlan_mapping, vlan_mapping)
def test_add_existing_raises_exception(self):
vlan_data = (2, 3, 4, 5, 6)
self.vlan_manager.add(1, *vlan_data)
with testtools.ExpectedException(vlanmanager.MappingAlreadyExists):
self.vlan_manager.add(1, *vlan_data)
def test_get_non_existing_raises_keyerror(self):
with testtools.ExpectedException(vlanmanager.MappingNotFound):
self.vlan_manager.get(1)
def test_pop(self):
vlan_data = (2, 3, 4, 5, 6)
expected_vlan_mapping = vlanmanager.LocalVLANMapping(*vlan_data)
self.vlan_manager.add(1, *vlan_data)
vlan_mapping = self.vlan_manager.pop(1)
self.assertEqual(expected_vlan_mapping, vlan_mapping)
self.assertFalse(self.vlan_manager.mapping)
def test_pop_non_existing_raises_exception(self):
with testtools.ExpectedException(vlanmanager.MappingNotFound):
self.vlan_manager.pop(1)
class TestDeprecationMessage(base.BaseTestCase):
def test_deprecation_message(self):
"""Test that calling function doesn't crash"""
vlanmanager.deprecate_local_vlan_map_in_object("foo")