ovs-agent: Seperate VLAN mapping outside of the agent

Previously, local mapping from tunnel ids or vlan ids to internal vlans
was held in ovs agent itself not exposing this mapping outside. This
patch itroduces a singleton object in memory providing needed interface
for handling local vlan mappings.

Partially-implements: blueprint vlan-aware-vms
Partially-implements: blueprint l2-api-extensions

Change-Id: I514c7632c1c26d6cfeb706fc5d829a46dcce3782
This commit is contained in:
Jakub Libosvar 2016-08-02 09:00:40 -04:00
parent 17d85e4748
commit a0543fda17
10 changed files with 439 additions and 185 deletions

View File

@ -90,7 +90,6 @@ class SecurityGroupAgentRpc(object):
self.context = context
self.plugin_rpc = plugin_rpc
self.init_firewall(defer_refresh_firewall, integration_bridge)
self.local_vlan_map = local_vlan_map
def init_firewall(self, defer_refresh_firewall=False,
integration_bridge=None):

View File

@ -22,6 +22,7 @@ from oslo_log import helpers as log_helpers
import six
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
@six.add_metaclass(abc.ABCMeta)
@ -222,19 +223,40 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
'''
pass
def get_agent_ports(self, fdb_entries, local_vlan_map):
def _get_lvm_getter(self, local_vlan_map):
def get_lvm_from_mapping(net_id, local_vlan_map):
"""This introduces backward compatibility with local_vlan_map, will
be removed in Ocata.
"""
try:
return local_vlan_map[net_id]
except KeyError:
raise vlanmanager.MappingNotFound(net_id=net_id)
def get_lvm_from_manager(net_id, local_vlan_map):
vlan_manager = vlanmanager.LocalVlanManager()
return vlan_manager.get(net_id)
if local_vlan_map is not None:
vlanmanager.deprecate_local_vlan_map_in_object(
"%s.get_agent_ports()" % self.__class__.__name__)
return get_lvm_from_mapping
return get_lvm_from_manager
def get_agent_ports(self, fdb_entries, local_vlan_map=None):
"""Generator to yield port info.
For each known (i.e found in local_vlan_map) network in
For each known (i.e found in VLAN manager) network in
fdb_entries, yield (lvm, fdb_entries[network_id]['ports']) pair.
:param fdb_entries: l2pop fdb entries
:param local_vlan_map: A dict to map network_id to
the corresponding lvm entry.
:param local_vlan_map: Deprecated.
"""
lvm_getter = self._get_lvm_getter(local_vlan_map)
for network_id, values in fdb_entries.items():
lvm = local_vlan_map.get(network_id)
if lvm is None:
try:
lvm = lvm_getter(network_id, local_vlan_map)
except vlanmanager.MappingNotFound:
continue
agent_ports = values.get('ports')
yield (lvm, agent_ports)
@ -281,7 +303,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
@log_helpers.log_method_call
def fdb_chg_ip_tun(self, context, br, fdb_entries, local_ip,
local_vlan_map):
local_vlan_map=None):
'''fdb update when an IP of a port is updated.
The ML2 l2-pop mechanism driver sends an fdb update rpc message when an
@ -305,13 +327,13 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
PortInfo has .mac_address and .ip_address attrs.
:param local_ip: local IP address of this agent.
:param local_vlan_map: A dict to map network_id to
the corresponding lvm entry.
:param local_vlan_map: Deprecated.
'''
lvm_getter = self._get_lvm_getter(local_vlan_map)
for network_id, agent_ports in fdb_entries.items():
lvm = local_vlan_map.get(network_id)
if not lvm:
try:
lvm = lvm_getter(network_id, local_vlan_map)
except vlanmanager.MappingNotFound:
continue
for agent_ip, state in agent_ports.items():

View File

@ -21,6 +21,7 @@ import signal
import sys
import time
import debtcollector
import netaddr
from neutron_lib import constants as n_const
from oslo_config import cfg
@ -58,6 +59,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_agent_extension_api as ovs_ext_api
from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_dvr_neutron_agent
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
LOG = logging.getLogger(__name__)
@ -66,31 +68,15 @@ cfg.CONF.import_group('AGENT', 'neutron.plugins.ml2.drivers.openvswitch.'
cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
'common.config')
LocalVLANMapping = debtcollector.moves.moved_class(
vlanmanager.LocalVLANMapping, 'LocalVLANMapping', __name__,
version='Newton', removal_version='Ocata')
class _mac_mydialect(netaddr.mac_unix):
word_fmt = '%.2x'
class LocalVLANMapping(object):
def __init__(self, vlan, network_type, physical_network, segmentation_id,
vif_ports=None):
if vif_ports is None:
vif_ports = {}
self.vlan = vlan
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
self.vif_ports = vif_ports
# set of tunnel ports on which packets should be flooded
self.tun_ofports = set()
def __str__(self):
return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
(self.vlan, self.network_type, self.physical_network,
self.segmentation_id))
class OVSPluginApi(agent_rpc.PluginApi):
pass
@ -192,7 +178,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
self.vlan_manager = vlanmanager.LocalVlanManager()
self._reset_tunnel_ofports()
@ -245,9 +231,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self._restore_local_vlan_map()
# Security group agent support
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, self.local_vlan_map,
defer_refresh_firewall=True, integration_bridge=self.int_br)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
integration_bridge=self.int_br)
# we default to False to provide backward compat with out of tree
# firewall drivers that expect the logic that existed on the Neutron
@ -303,6 +289,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout
@debtcollector.removals.removed_property(
version='Newton', removal_version='Ocata')
def local_vlan_map(self):
"""Provide backward compatibility with local_vlan_map attribute"""
return self.vlan_manager.mapping
def _parse_bridge_mappings(self, bridge_mappings):
try:
return n_utils.parse_mappings(bridge_mappings)
@ -403,10 +395,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
connection, constants.EXTENSION_DRIVER_TYPE,
self.agent_api)
@debtcollector.moves.moved_method(
'get_net_uuid',
'OVSNeutronAgent.get_net_uuid() moved to vlanmanager.LocalVlanManager',
removal_version='Ocata')
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
if vif_id in vlan_mapping.vif_ports:
return network_id
try:
return self.vlan_manager.get_net_uuid(vif_id)
except vlanmanager.VifIdNotFound:
pass
def port_update(self, context, **kwargs):
port = kwargs.get('port')
@ -511,8 +508,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
self.local_vlan_map):
for lvm, agent_ports in self.get_agent_ports(fdb_entries):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
@ -525,8 +521,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def fdb_remove(self, context, fdb_entries):
LOG.debug("fdb_remove received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
self.local_vlan_map):
for lvm, agent_ports in self.get_agent_ports(fdb_entries):
agent_ports.pop(self.local_ip, None)
if len(agent_ports):
if not self.enable_distributed_routing:
@ -574,7 +569,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug("update chg_ip received")
with self.tun_br.deferred() as deferred_br:
self.fdb_chg_ip_tun(context, deferred_br, fdb_entries,
self.local_ip, self.local_vlan_map)
self.local_ip)
def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
ip_address):
@ -638,10 +633,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# On a restart or crash of OVS, the network associated with this VLAN
# will already be assigned, so check for that here before assigning a
# new one.
lvm = self.local_vlan_map.get(net_uuid)
if lvm:
try:
lvm = self.vlan_manager.get(net_uuid)
lvid = lvm.vlan
else:
except vlanmanager.MappingNotFound:
lvid = self._local_vlan_hints.pop(net_uuid, None)
if lvid is None:
if not self.available_local_vlans:
@ -649,10 +644,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
net_uuid)
return
lvid = self.available_local_vlans.pop()
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
network_type,
physical_network,
segmentation_id)
self.vlan_manager.add(
net_uuid, lvid, network_type, physical_network,
segmentation_id)
LOG.info(_LI("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
@ -713,8 +707,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param net_uuid: the network uuid associated with this vlan.
'''
lvm = self.local_vlan_map.pop(net_uuid, None)
if lvm is None:
try:
lvm = vlanmanager.LocalVlanManager().pop(net_uuid)
except KeyError:
LOG.debug("Network %s not used on agent.", net_uuid)
return
@ -786,10 +781,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param device_owner: the string indicative of owner of this port
:param ovs_restarted: indicates if this is called for an OVS restart.
'''
if net_uuid not in self.local_vlan_map or ovs_restarted:
if net_uuid not in self.vlan_manager or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
lvm = self.vlan_manager.get(net_uuid)
lvm.vif_ports[port.vif_id] = port
self.dvr_agent.bind_port_to_dvr(port, lvm,
@ -824,8 +819,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
info_by_port = {x['name']: [x['tag'], x['other_config']]
for x in port_info}
for port_detail in need_binding_ports:
lvm = self.local_vlan_map.get(port_detail['network_id'])
if not lvm:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
continue
port = port_detail['vif_port']
cur_info = info_by_port.get(port.port_name)
@ -844,8 +840,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Port", columns=["name", "tag"], ports=port_names, if_exists=True)
tags_by_name = {x['name']: x['tag'] for x in port_info}
for port_detail in need_binding_ports:
lvm = self.local_vlan_map.get(port_detail['network_id'])
if not lvm:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
# network for port was deleted. skip this port since it
# will need to be handled as a DEAD port in the next scan
continue
@ -953,15 +950,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param vif_id: the id of the vif
:param net_uuid: the net_uuid this port is associated with.
'''
if net_uuid is None:
net_uuid = self.get_net_uuid(vif_id)
if not self.local_vlan_map.get(net_uuid):
LOG.info(_LI('port_unbound(): net_uuid %s not in local_vlan_map'),
net_uuid)
try:
net_uuid = net_uuid or self.vlan_manager.get_net_uuid(vif_id)
except vlanmanager.VifIdNotFound:
LOG.info(
_LI('port_unbound(): net_uuid %s not managed by VLAN manager'),
net_uuid)
return
lvm = self.local_vlan_map[net_uuid]
lvm = self.vlan_manager.get(net_uuid)
if vif_id in lvm.vif_ports:
vif_port = lvm.vif_ports[vif_id]
@ -1374,7 +1371,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"""
port_tags = self.int_br.get_port_tag_dict()
changed_ports = set()
for lvm in self.local_vlan_map.values():
for lvm in self.vlan_manager:
for port in lvm.vif_ports.values():
if (
port.port_name in port_tags
@ -1450,7 +1447,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ofports = self.tun_br_ofports[tunnel_type].values()
if ofports and not self.l2_pop:
# Update flooding flows to include the new tunnel
for vlan_mapping in list(self.local_vlan_map.values()):
for vlan_mapping in self.vlan_manager:
if vlan_mapping.network_type == tunnel_type:
br.install_flood_to_tun(vlan_mapping.vlan,
vlan_mapping.segmentation_id,
@ -1470,7 +1467,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
# Check if this tunnel port is still used
for lvm in self.local_vlan_map.values():
for lvm in self.vlan_manager:
if tun_ofport in lvm.tun_ofports:
break
# If not, remove it

View File

@ -0,0 +1,117 @@
# Copyright 2016 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import debtcollector
from neutron_lib import exceptions
from neutron._i18n import _
def deprecate_local_vlan_map_in_object(object_name):
debtcollector.deprecate(
"local_vlan_map argument for %s was deprecated." % object_name,
version="Newton", removal_version="Ocata")
class VifIdNotFound(exceptions.NeutronException):
message = _('VIF ID %(vif_id)s not found in any network managed by '
'VLAN Manager')
class MappingAlreadyExists(exceptions.NeutronException):
message = _('VLAN mapping for network with id %(net_id)s already exists')
class MappingNotFound(exceptions.NeutronException):
message = _('Mapping for network %(net_id)s not found.')
class LocalVLANMapping(object):
def __init__(self, vlan, network_type, physical_network, segmentation_id,
vif_ports=None):
self.vlan = vlan
self.network_type = network_type
self.physical_network = physical_network
self.segmentation_id = segmentation_id
self.vif_ports = vif_ports or {}
# set of tunnel ports on which packets should be flooded
self.tun_ofports = set()
def __str__(self):
return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
(self.vlan, self.network_type, self.physical_network,
self.segmentation_id))
def __eq__(self, other):
return all(hasattr(other, a) and getattr(self, a) == getattr(other, a)
for a in ['vlan',
'network_type',
'physical_network',
'segmentation_id',
'vif_ports'])
def __hash__(self):
return id(self)
class LocalVlanManager(object):
"""Singleton manager that maps internal VLAN mapping to external network
segmentation ids.
"""
def __new__(cls):
if not hasattr(cls, '_instance'):
cls._instance = super(LocalVlanManager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'mapping'):
self.mapping = {}
def __contains__(self, key):
return key in self.mapping
def __iter__(self):
for value in list(self.mapping.values()):
yield value
def items(self):
for item in self.mapping.items():
yield item
def add(self, net_id, vlan, network_type, physical_network,
segmentation_id, vif_ports=None):
if net_id in self.mapping:
raise MappingAlreadyExists(net_id=net_id)
self.mapping[net_id] = LocalVLANMapping(
vlan, network_type, physical_network, segmentation_id, vif_ports)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.mapping.items():
if vif_id in vlan_mapping.vif_ports:
return network_id
raise VifIdNotFound(vif_id=vif_id)
def get(self, net_id):
try:
return self.mapping[net_id]
except KeyError:
raise MappingNotFound(net_id=net_id)
def pop(self, net_id):
try:
return self.mapping.pop(net_id)
except KeyError:
raise MappingNotFound(net_id=net_id)

View File

@ -35,7 +35,6 @@ from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
from neutron.tests import base
from neutron.tests import tools
from neutron.tests.unit.extensions import test_securitygroup as test_sg
@ -3121,12 +3120,8 @@ class TestSecurityGroupAgentWithOVSIptables(
test_rpc_v1_1)
def _init_agent(self, defer_refresh_firewall):
fake_map = ovs_neutron_agent.LocalVLANMapping(1, 'network_type',
'physical_network', 1)
local_vlan_map = {'fakenet': fake_map}
self.agent = sg_rpc.SecurityGroupAgentRpc(
context=None, plugin_rpc=self.rpc,
local_vlan_map=local_vlan_map,
defer_refresh_firewall=defer_refresh_firewall)
self._enforce_order_in_firewall(self.agent.firewall)

View File

@ -20,8 +20,9 @@ import mock
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
from neutron.tests import base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent import \
test_vlanmanager
class FakeNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin):
@ -53,6 +54,8 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
def setUp(self):
super(TestL2populationRpcCallBackTunnelMixinBase, self).setUp()
self.vlan_manager = self.useFixture(
test_vlanmanager.LocalVlanManagerFixture()).manager
self.fakeagent = FakeNeutronAgent()
self.fakebr = mock.Mock()
Port = collections.namedtuple('Port', 'ip, ofport')
@ -112,21 +115,13 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
},
}
self.lvm1 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[0].vlan, self.type_gre, self.lvms[0].phys,
self.lvms[0].segid, {self.lvms[0].vif: self.lvms[0].port})
self.lvm2 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[1].vlan, self.type_gre, self.lvms[1].phys,
self.lvms[1].segid, {self.lvms[1].vif: self.lvms[1].port})
self.lvm3 = ovs_neutron_agent.LocalVLANMapping(
self.lvms[2].vlan, self.type_gre, self.lvms[2].phys,
self.lvms[2].segid, {self.lvms[2].vif: self.lvms[2].port})
self.local_vlan_map1 = {
self.lvms[0].net: self.lvm1,
self.lvms[1].net: self.lvm2,
self.lvms[2].net: self.lvm3,
}
for i in range(3):
self.vlan_manager.add(
self.lvms[i].net,
self.lvms[i].vlan, self.type_gre, self.lvms[i].phys,
self.lvms[i].segid, {self.lvms[i].vif: self.lvms[i].port})
setattr(self, 'lvm%d' % i,
self.vlan_manager.get(self.lvms[i].net))
self.upd_fdb_entry1_val = {
self.lvms[0].net: {

View File

@ -25,20 +25,22 @@ class TestL2populationRpcCallBackTunnelMixin(
l2population_rpc_base.TestL2populationRpcCallBackTunnelMixinBase):
def test_get_agent_ports_no_data(self):
# Make sure vlan manager has no mappings that were added in setUp()
self.vlan_manager.mapping = {}
self.assertFalse(
list(self.fakeagent.get_agent_ports(self.fdb_entries1, {})))
list(self.fakeagent.get_agent_ports(self.fdb_entries1)))
def test_get_agent_ports_non_existence_key_in_lvm(self):
results = {}
del self.local_vlan_map1[self.lvms[1].net]
self.vlan_manager.pop(self.lvms[1].net)
for lvm, agent_ports in self.fakeagent.get_agent_ports(
self.fdb_entries1, self.local_vlan_map1):
self.fdb_entries1):
results[lvm] = agent_ports
expected = {
self.lvm1: {
self.lvm0: {
self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
self.local_ip: []},
self.lvm3: {
self.lvm2: {
self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
self.local_ip: []},
}
@ -48,14 +50,14 @@ class TestL2populationRpcCallBackTunnelMixin(
results = {}
self.fdb_entries1[self.lvms[1].net]['ports'] = {}
for lvm, agent_ports in self.fakeagent.get_agent_ports(
self.fdb_entries1, self.local_vlan_map1):
self.fdb_entries1):
results[lvm] = agent_ports
expected = {
self.lvm1: {
self.lvm0: {
self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
self.local_ip: []},
self.lvm2: {},
self.lvm3: {
self.lvm1: {},
self.lvm2: {
self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
self.local_ip: []},
}
@ -65,40 +67,40 @@ class TestL2populationRpcCallBackTunnelMixin(
with mock.patch.object(self.fakeagent, 'setup_tunnel_port'),\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
def test_fdb_add_tun_non_existence_key_in_ofports(self):
ofport = self.lvm1.network_type + '0a0a0a0a'
ofport = self.lvm0.network_type + '0a0a0a0a'
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
return_value=ofport
) as mock_setup_tunnel_port,\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
mock_setup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm0.network_type)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, ofport),
self.ports[1].ip, self.lvm0, ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -110,16 +112,16 @@ class TestL2populationRpcCallBackTunnelMixin(
) as mock_setup_tunnel_port,\
mock.patch.object(self.fakeagent, 'add_fdb_flow'
) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
mock_setup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ip, self.lvm1.network_type)
self.fakebr, self.ports[1].ip, self.lvm0.network_type)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_add_fdb_flow.call_args_list))
@ -127,16 +129,16 @@ class TestL2populationRpcCallBackTunnelMixin(
def test_fdb_remove_tun(self):
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -147,35 +149,35 @@ class TestL2populationRpcCallBackTunnelMixin(
) as mock_del_fdb_flow,\
mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'
) as mock_cleanup_tunnel_port:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr,
(n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]),
self.ports[1].ip, self.lvm1, self.ports[1].ofport),
self.ports[1].ip, self.lvm0, self.ports[1].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
mock_cleanup_tunnel_port.assert_called_once_with(
self.fakebr, self.ports[1].ofport, self.lvm1.network_type)
self.fakebr, self.ports[1].ofport, self.lvm0.network_type)
def test_fdb_remove_tun_non_existence_key_in_ofports(self):
del self.ofports[self.type_gre][self.ports[1].ip]
with mock.patch.object(
self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm0,
self.agent_ports,
self._tunnel_port_lookup)
expected = [
mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
self.ports[0].ip, self.lvm1, self.ports[0].ofport),
self.ports[0].ip, self.lvm0, self.ports[0].ofport),
mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
self.ports[2].ip, self.lvm1, self.ports[2].ofport),
self.ports[2].ip, self.lvm0, self.ports[2].ofport),
]
self.assertEqual(sorted(expected),
sorted(mock_del_fdb_flow.call_args_list))
@ -193,23 +195,24 @@ class TestL2populationRpcCallBackTunnelMixin(
'context', self.upd_fdb_entry1)
def test__fdb_chg_ip(self):
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
self.upd_fdb_entry1_val, self.local_ip,
self.local_vlan_map1)
with mock.patch.object(
self.fakeagent,
'setup_entry_for_arp_reply') as m_setup_entry_for_arp_reply:
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
self.upd_fdb_entry1_val,
self.local_ip)
expected = [
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
mock.call(self.fakebr, 'remove', self.lvm0.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
mock.call(self.fakebr, 'add', self.lvm0.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm0.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm0.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
self.lvms[1].ip),
mock.call(self.fakebr, 'remove', self.lvm2.vlan, self.lvms[0].mac,
self.lvms[0].ip),
mock.call(self.fakebr, 'add', self.lvm2.vlan, self.lvms[2].mac,
mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[2].mac,
self.lvms[2].ip),
]
m_setup_entry_for_arp_reply.assert_has_calls(expected, any_order=True)
@ -233,8 +236,7 @@ class TestL2populationRpcCallBackTunnelMixin(
m_setup_entry_for_arp_reply = mock.Mock()
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
upd_fdb_entry_val, self.local_ip,
self.local_vlan_map1)
upd_fdb_entry_val, self.local_ip)
self.assertFalse(m_setup_entry_for_arp_reply.call_count)
def test_fdb_chg_ip_tun_empty_before_after(self):
@ -247,6 +249,5 @@ class TestL2populationRpcCallBackTunnelMixin(
self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
# passing non-local ip
self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
upd_fdb_entry_val, "8.8.8.8",
self.local_vlan_map1)
upd_fdb_entry_val, "8.8.8.8")
self.assertFalse(m_setup_entry_for_arp_reply.call_count)

View File

@ -36,6 +36,8 @@ from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
from neutron.tests import base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import test_vlanmanager
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
@ -97,6 +99,7 @@ class TestOvsNeutronAgent(object):
def setUp(self):
super(TestOvsNeutronAgent, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
@ -148,9 +151,8 @@ class TestOvsNeutronAgent(object):
fixed_ips = [{'subnet_id': 'my-subnet-uuid',
'ip_address': '1.1.1.1'}]
if old_local_vlan is not None:
self.agent.local_vlan_map[net_uuid] = (
self.mod_agent.LocalVLANMapping(
old_local_vlan, None, None, None))
self.agent.vlan_manager.add(
net_uuid, old_local_vlan, None, None, None)
with mock.patch.object(self.agent, 'int_br', autospec=True) as int_br:
int_br.db_get_val.return_value = db_get_val
int_br.set_db_attribute.return_value = True
@ -603,9 +605,8 @@ class TestOvsNeutronAgent(object):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
port = ovs_lib.VifPort(1, 1, 1, mac, br)
lvm = self.mod_agent.LocalVLANMapping(
1, '1', None, 1, {port.vif_id: port})
local_vlan_map = {'1': lvm}
self.agent.vlan_manager.add(
'1', 1, '1', None, 1, {port.vif_id: port})
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
port_tags_dict = {1: []}
@ -613,8 +614,7 @@ class TestOvsNeutronAgent(object):
added=set([3]), current=vif_port_set,
removed=set([2]), updated=set([1])
)
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map),\
mock.patch.object(self.agent, 'tun_br', autospec=True):
with mock.patch.object(self.agent, 'tun_br', autospec=True):
actual = self.mock_scan_ports(
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
@ -646,8 +646,9 @@ class TestOvsNeutronAgent(object):
new_failed_devices_retries_map)
def test_add_port_tag_info(self):
self.agent.local_vlan_map["net1"] = mock.Mock()
self.agent.local_vlan_map["net1"].vlan = "1"
lvm = mock.Mock()
lvm.vlan = "1"
self.agent.vlan_manager.mapping["net1"] = lvm
ovs_db_list = [{'name': 'tap1',
'tag': [],
'other_config': {'segmentation_id': '1'}},
@ -682,7 +683,7 @@ class TestOvsNeutronAgent(object):
def test_bind_devices(self):
devices_up = ['tap1']
devices_down = ['tap2']
self.agent.local_vlan_map["net1"] = mock.Mock()
self.agent.vlan_manager.mapping["net1"] = mock.Mock()
ovs_db_list = [{'name': 'tap1', 'tag': []},
{'name': 'tap2', 'tag': []}]
vif_port1 = mock.Mock()
@ -714,8 +715,7 @@ class TestOvsNeutronAgent(object):
self.agent.prevent_arp_spoofing = enable_prevent_arp_spoofing
ovs_db_list = [{'name': 'fake_device', 'tag': []}]
self.agent.local_vlan_map = {
'fake_network': ovs_agent.LocalVLANMapping(1, None, None, 1)}
self.agent.vlan_manager.add('fake_network', 1, None, None, 1)
vif_port = mock.Mock()
vif_port.port_name = 'fake_device'
vif_port.ofport = 1
@ -1423,7 +1423,7 @@ class TestOvsNeutronAgent(object):
lvm = mock.Mock()
lvm.network_type = "gre"
lvm.vif_ports = {"vif1": mock.Mock()}
self.agent.local_vlan_map["netuid12345"] = lvm
self.agent.vlan_manager.mapping["netuid12345"] = lvm
self.agent.port_unbound("vif1", "netuid12345")
self.assertTrue(reclvl_fn.called)
@ -1446,7 +1446,7 @@ class TestOvsNeutronAgent(object):
lvm2.vlan = 'vlan2'
lvm2.segmentation_id = 'seg2'
lvm2.tun_ofports = set(['1', '2'])
self.agent.local_vlan_map = {'net1': lvm1, 'net2': lvm2}
self.agent.vlan_manager.mapping = {'net1': lvm1, 'net2': lvm2}
self.agent.tun_br_ofports = {'gre':
{'1.1.1.1': '1', '2.2.2.2': '2'}}
self.agent.arp_responder_enabled = True
@ -2050,19 +2050,17 @@ class TestOvsNeutronAgent(object):
"""
def add_new_vlan_mapping(*args, **kwargs):
self.agent.local_vlan_map['bar'] = (
self.mod_agent.LocalVLANMapping(1, 2, 3, 4))
self.agent.vlan_manager.add('bar', 1, 2, 3, 4)
bridge = mock.Mock()
tunnel_type = 'vxlan'
self.agent.tun_br_ofports = {tunnel_type: dict()}
self.agent.l2_pop = False
self.agent.local_vlan_map = {
'foo': self.mod_agent.LocalVLANMapping(4, tunnel_type, 2, 1)}
self.agent.vlan_manager.add('foo', 4, tunnel_type, 2, 1)
self.agent.local_ip = '2.3.4.5'
bridge.install_flood_to_tun.side_effect = add_new_vlan_mapping
self.agent._setup_tunnel_port(bridge, 1, '1.2.3.4',
tunnel_type=tunnel_type)
self.assertIn('bar', self.agent.local_vlan_map)
self.assertIn('bar', self.agent.vlan_manager)
def test_setup_entry_for_arp_reply_ignores_ipv6_addresses(self):
self.agent.arp_responder_enabled = True
@ -2296,7 +2294,8 @@ class TestOvsDvrNeutronAgent(object):
self.agent.patch_tun_ofport = 1
self.agent.patch_int_ofport = 2
self.agent.dvr_agent.local_ports = {}
self.agent.local_vlan_map = {}
self.agent.vlan_manager = self.useFixture(
test_vlanmanager.LocalVlanManagerFixture()).manager
self.agent.dvr_agent.enable_distributed_routing = True
self.agent.dvr_agent.enable_tunneling = True
self.agent.dvr_agent.patch_tun_ofport = 1
@ -2397,7 +2396,7 @@ class TestOvsDvrNeutronAgent(object):
n_const.DEVICE_OWNER_DVR_INTERFACE, False)
phy_ofp = self.agent.dvr_agent.phys_ofports[physical_network]
int_ofp = self.agent.dvr_agent.int_ofports[physical_network]
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_phys_br = [
mock.call.provision_local_vlan(
port=phy_ofp,
@ -2485,7 +2484,7 @@ class TestOvsDvrNeutronAgent(object):
self._port, self._net_uuid, network_type,
physical_network, segmentation_id, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE, False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = self._expected_port_bound(
self._port, lvid)
expected_on_tun_br = [
@ -2567,7 +2566,7 @@ class TestOvsDvrNeutronAgent(object):
def test_port_bound_for_dvr_with_csnat_ports(self):
self._setup_for_dvr_test()
int_br, tun_br = self._port_bound_for_dvr_with_csnat_ports()
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.install_dvr_to_src_mac(
network_type='vxlan',
@ -2594,7 +2593,7 @@ class TestOvsDvrNeutronAgent(object):
# simulate a replug
self._port.ofport = 12
int_br, tun_br = self._port_bound_for_dvr_with_csnat_ports()
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.delete_dvr_to_src_mac(
network_type='vxlan',
@ -2715,7 +2714,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
self.assertEqual(self._expected_port_bound(self._port, lvid),
int_br.mock_calls)
expected_on_tun_br = [
@ -2750,7 +2749,7 @@ class TestOvsDvrNeutronAgent(object):
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed(
[self._port.vif_id])
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
if ip_version == 4:
expected = [
mock.call.delete_dvr_process_ipv4(
@ -2812,7 +2811,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_DVR_INTERFACE,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
self.assertEqual(
self._expected_port_bound(self._port, lvid),
int_br.mock_calls)
@ -2927,7 +2926,7 @@ class TestOvsDvrNeutronAgent(object):
None, None, self._fixed_ips,
n_const.DEVICE_OWNER_ROUTER_SNAT,
False)
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
lvid = self.agent.vlan_manager.get(self._net_uuid).vlan
expected_on_int_br = [
mock.call.install_dvr_to_src_mac(
network_type='vxlan',

View File

@ -28,6 +28,8 @@ from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import test_vlanmanager
def nonzero(f):
@ -74,6 +76,7 @@ class TunnelTest(object):
def setUp(self):
super(TunnelTest, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
conn_patcher = mock.patch(
'neutron.agent.ovsdb.native.connection.Connection.start')
conn_patcher.start()
@ -93,11 +96,11 @@ class TunnelTest(object):
self.MAP_TUN_INT_OFPORT = 33333
self.MAP_TUN_PHY_OFPORT = 44444
self.LVM = self.mod_agent.LocalVLANMapping(
self.LVM_DATA = (
LV_ID, 'gre', None, LS_ID, VIF_PORTS)
self.LVM_FLAT = self.mod_agent.LocalVLANMapping(
self.LVM_FLAT_DATA = (
LV_ID, 'flat', 'net1', LS_ID, VIF_PORTS)
self.LVM_VLAN = self.mod_agent.LocalVLANMapping(
self.LVM_VLAN_DATA = (
LV_ID, 'vlan', 'net1', LS_ID, VIF_PORTS)
self.inta = mock.Mock()
@ -420,16 +423,16 @@ class TunnelTest(object):
a = self._build_agent()
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM.vlan, a.available_local_vlans)
self.assertIn(self.LVM_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_reclaim_local_vlan_flat(self):
self.mock_map_tun_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.MAP_TUN_PHY_OFPORT,
lvid=self.LVM_FLAT.vlan))
lvid=self.LVM_FLAT_DATA[0]))
self.mock_int_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.INT_OFPORT,
@ -440,16 +443,16 @@ class TunnelTest(object):
a.int_ofports['net1'] = self.INT_OFPORT
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM_FLAT
a.vlan_manager.add(NET_UUID, *self.LVM_FLAT_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM_FLAT.vlan, a.available_local_vlans)
self.assertIn(self.LVM_FLAT_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_reclaim_local_vlan_vlan(self):
self.mock_map_tun_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.MAP_TUN_PHY_OFPORT,
lvid=self.LVM_VLAN.vlan))
lvid=self.LVM_VLAN_DATA[0]))
self.mock_int_bridge_expected.append(
mock.call.reclaim_local_vlan(
port=self.INT_OFPORT,
@ -460,9 +463,9 @@ class TunnelTest(object):
a.int_ofports['net1'] = self.INT_OFPORT
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = self.LVM_VLAN
a.vlan_manager.add(NET_UUID, *self.LVM_VLAN_DATA)
a.reclaim_local_vlan(NET_UUID)
self.assertIn(self.LVM_VLAN.vlan, a.available_local_vlans)
self.assertIn(self.LVM_VLAN_DATA[0], a.available_local_vlans)
self._verify_mock_calls()
def test_port_bound(self):
@ -477,7 +480,7 @@ class TunnelTest(object):
vlan_mapping)]
a = self._build_agent()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.local_dvr_map = {}
self.ovs_bridges[self.INT_BRIDGE].db_get_val.return_value = {}
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID,
@ -488,7 +491,7 @@ class TunnelTest(object):
with mock.patch.object(self.mod_agent.OVSNeutronAgent,
'reclaim_local_vlan') as reclaim_local_vlan:
a = self._build_agent()
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
a.port_unbound(VIF_ID, NET_UUID)
reclaim_local_vlan.assert_called_once_with(NET_UUID)
@ -507,7 +510,7 @@ class TunnelTest(object):
a = self._build_agent()
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = self.LVM
a.vlan_manager.add(NET_UUID, *self.LVM_DATA)
self.ovs_bridges[self.INT_BRIDGE].db_get_val.return_value = mock.Mock()
a.port_dead(VIF_PORT)
self._verify_mock_calls()

View File

@ -0,0 +1,126 @@
# Copyright 2016 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import testtools
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
from neutron.tests import base
class LocalVlanManagerFixture(fixtures.Fixture):
def _setUp(self):
super(LocalVlanManagerFixture, self)._setUp()
self.vlan_manager = vlanmanager.LocalVlanManager()
self.addCleanup(self.restore_manager)
# Remove _instance attribute from VlanManager in order to not obtain a
# singleton
del vlanmanager.LocalVlanManager._instance
self.manager = vlanmanager.LocalVlanManager()
def restore_manager(self):
vlanmanager.LocalVlanManager._instance = self.vlan_manager
class TestLocalVLANMapping(base.BaseTestCase):
def test___eq___equal(self):
mapping1 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
mapping2 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
self.assertEqual(mapping1, mapping2)
def test___eq___different(self):
mapping1 = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
mapping2 = vlanmanager.LocalVLANMapping(1, 2, 4, 4, 5)
self.assertNotEqual(mapping1, mapping2)
def test___eq___different_type(self):
mapping = vlanmanager.LocalVLANMapping(1, 2, 3, 4, 5)
self.assertNotEqual(mapping, "foo")
class TestLocalVlanManager(base.BaseTestCase):
def setUp(self):
super(TestLocalVlanManager, self).setUp()
self.vlan_manager = self.useFixture(LocalVlanManagerFixture()).manager
def test_is_singleton(self):
self.vlan_manager.add(1, None, None, None, None)
new_vlan_manager = vlanmanager.LocalVlanManager()
self.assertIs(new_vlan_manager, self.vlan_manager)
self.assertItemsEqual(new_vlan_manager.mapping,
self.vlan_manager.mapping)
def test_in_operator_on_key(self):
self.vlan_manager.add(1, None, None, None, None)
self.assertIn(1, self.vlan_manager)
self.assertNotIn(2, self.vlan_manager)
def test_iterator_returns_vlan_mappings(self):
created_vlans = []
for val in range(3):
self.vlan_manager.add(val, val, val, val, val)
created_vlans.append(self.vlan_manager.get(val))
self.assertItemsEqual(created_vlans, list(self.vlan_manager))
def test_get_net_uuid_existing(self):
port_id = 'port-id'
vlan_data = (2, 3, 4, 5, {port_id: 'port'})
net_id = 1
self.vlan_manager.add(net_id, *vlan_data)
obtained_net_id = self.vlan_manager.get_net_uuid(port_id)
self.assertEqual(net_id, obtained_net_id)
def test_get_net_uuid_non_existing_raises_exception(self):
vlan_data = (1, 2, 3, 4, 5, {'port_id': 'port'})
self.vlan_manager.add(*vlan_data)
with testtools.ExpectedException(vlanmanager.VifIdNotFound):
self.vlan_manager.get_net_uuid('non-existing-port')
def test_add_and_get(self):
vlan_data = (2, 3, 4, 5, 6)
expected_vlan_mapping = vlanmanager.LocalVLANMapping(*vlan_data)
self.vlan_manager.add(1, *vlan_data)
vlan_mapping = self.vlan_manager.get(1)
self.assertEqual(expected_vlan_mapping, vlan_mapping)
def test_add_existing_raises_exception(self):
vlan_data = (2, 3, 4, 5, 6)
self.vlan_manager.add(1, *vlan_data)
with testtools.ExpectedException(vlanmanager.MappingAlreadyExists):
self.vlan_manager.add(1, *vlan_data)
def test_get_non_existing_raises_keyerror(self):
with testtools.ExpectedException(vlanmanager.MappingNotFound):
self.vlan_manager.get(1)
def test_pop(self):
vlan_data = (2, 3, 4, 5, 6)
expected_vlan_mapping = vlanmanager.LocalVLANMapping(*vlan_data)
self.vlan_manager.add(1, *vlan_data)
vlan_mapping = self.vlan_manager.pop(1)
self.assertEqual(expected_vlan_mapping, vlan_mapping)
self.assertFalse(self.vlan_manager.mapping)
def test_pop_non_existing_raises_exception(self):
with testtools.ExpectedException(vlanmanager.MappingNotFound):
self.vlan_manager.pop(1)
class TestDeprecationMessage(base.BaseTestCase):
def test_deprecation_message(self):
"""Test that calling function doesn't crash"""
vlanmanager.deprecate_local_vlan_map_in_object("foo")