use plugin constants from neutron-lib

neutron-lib contains a number of the plugin related constants from
neutron.plugins.common.constants. This patch consumes those constants
from neutron-lib and removes them from neutron. In addition the notion
of the dummy plugin service type is moved strictly into the test
package of neutron since it's not a real service plugin.

NeutronLibImpact

Change-Id: I767c626f3fe6159ab3abd6a7ae3cb9893b79bf66
This commit is contained in:
Boden R 2017-05-31 07:56:18 -06:00
parent ed55c26171
commit 95f1e03446
65 changed files with 286 additions and 350 deletions
neutron
agent/common
cmd/sanity
conf/plugins/ml2/drivers
core_extensions
extensions
objects
plugins
services
flavors
l3_router/service_providers
tests

@ -21,6 +21,7 @@ import time
import uuid
from debtcollector import removals
from neutron_lib import constants as p_const
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
@ -32,7 +33,6 @@ from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.agent.ovsdb import api as ovsdb_api
from neutron.conf.agent import ovs_conf
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants

@ -34,7 +34,6 @@ from neutron.agent.linux import utils as agent_utils
from neutron.cmd import runtime_checks
from neutron.common import constants
from neutron.common import utils as common_utils
from neutron.plugins.common import constants as const
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as ovs_const
@ -49,14 +48,14 @@ MINIMUM_DIBBLER_VERSION = '1.0.1'
def ovs_vxlan_supported(from_ip='192.0.2.1', to_ip='192.0.2.2'):
name = common_utils.get_rand_device_name(prefix='vxlantest-')
with ovs_lib.OVSBridge(name) as br:
port = br.add_tunnel_port(from_ip, to_ip, const.TYPE_VXLAN)
port = br.add_tunnel_port(from_ip, to_ip, n_consts.TYPE_VXLAN)
return port != ovs_lib.INVALID_OFPORT
def ovs_geneve_supported(from_ip='192.0.2.3', to_ip='192.0.2.4'):
name = common_utils.get_rand_device_name(prefix='genevetest-')
with ovs_lib.OVSBridge(name) as br:
port = br.add_tunnel_port(from_ip, to_ip, const.TYPE_GENEVE)
port = br.add_tunnel_port(from_ip, to_ip, n_consts.TYPE_GENEVE)
return port != ovs_lib.INVALID_OFPORT

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from oslo_config import cfg
from neutron._i18n import _
from neutron.plugins.common import constants as p_const
gre_opts = [

@ -16,7 +16,6 @@ from neutron_lib import constants as n_const
from oslo_config import cfg
from neutron._i18n import _
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants
@ -111,7 +110,7 @@ agent_opts = [
cfg.ListOpt('tunnel_types', default=DEFAULT_TUNNEL_TYPES,
help=_("Network types supported by the agent "
"(gre and/or vxlan).")),
cfg.PortOpt('vxlan_udp_port', default=p_const.VXLAN_UDP_PORT,
cfg.PortOpt('vxlan_udp_port', default=n_const.VXLAN_UDP_PORT,
help=_("The UDP port to use for VXLAN tunnels.")),
cfg.IntOpt('veth_mtu', default=9000,
help=_("MTU size of veth interfaces")),

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron.common import exceptions as n_exc
from neutron.core_extensions import base
from neutron.db import api as db_api
from neutron.objects.qos import policy as policy_object
from neutron.plugins.common import constants as plugin_constants
from neutron.services.qos import qos_consts

@ -17,13 +17,13 @@ from neutron_lib.api import extensions as api_extensions
from neutron_lib.api import validators
from neutron_lib.db import constants as db_const
from neutron_lib import exceptions as nexception
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from neutron._i18n import _
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
from neutron.plugins.common import constants
# Flavor Exceptions

@ -18,12 +18,12 @@ from neutron_lib.api import converters
from neutron_lib.api import extensions
from neutron_lib.db import constants as db_const
from neutron_lib import exceptions as nexception
from neutron_lib.plugins import constants
from neutron_lib.services import base as service_base
import six
from neutron._i18n import _
from neutron.api.v2 import resource_helper
from neutron.plugins.common import constants
class MeteringLabelNotFound(nexception.NotFound):

@ -20,6 +20,7 @@ import re
from neutron_lib.api import converters
from neutron_lib.api import extensions as api_extensions
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
import six
@ -29,7 +30,6 @@ from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
from neutron.common import constants as common_constants
from neutron.objects.qos import rule as rule_object
from neutron.plugins.common import constants
from neutron.services.qos import qos_consts

@ -26,7 +26,6 @@ import six
from neutron._i18n import _
from neutron.common import constants
from neutron.common import utils
from neutron.plugins.common import constants as plugin_constants
class HARouterEnumField(obj_fields.AutoTypedField):
@ -85,8 +84,8 @@ class PortRangeWith0Field(obj_fields.AutoTypedField):
class VlanIdRange(RangeConstrainedInteger):
def __init__(self, **kwargs):
super(VlanIdRange, self).__init__(start=plugin_constants.MIN_VLAN_TAG,
end=plugin_constants.MAX_VLAN_TAG,
super(VlanIdRange, self).__init__(start=lib_constants.MIN_VLAN_TAG,
end=lib_constants.MAX_VLAN_TAG,
**kwargs)

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.common import constants
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_log import log as logging
from oslo_utils import versionutils

@ -13,31 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.plugins import constants
from neutron_lib import constants
from neutron_lib.plugins import constants as p_const
# Neutron well-known service type constants:
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
FLAVORS = "FLAVORS"
QOS = "QOS"
# TODO(boden): consume once I208c976c3e7e43e27e1907ed196af8efccd73f22 releases
LOG_API = "LOGGING"
# Maps extension alias to service type that
# can be implemented by the core plugin.
EXT_TO_SERVICE_MAPPING = {
'dummy': DUMMY,
'lbaas': LOADBALANCER,
'lbaasv2': LOADBALANCERV2,
'fwaas': FIREWALL,
'vpnaas': VPN,
'metering': METERING,
'lbaas': p_const.LOADBALANCER,
'lbaasv2': p_const.LOADBALANCERV2,
'fwaas': p_const.FIREWALL,
'vpnaas': p_const.VPN,
'metering': p_const.METERING,
'router': constants.L3,
'qos': QOS,
'qos': p_const.QOS,
}
# Maps default service plugins entry points to their extension aliases
@ -49,58 +41,3 @@ DEFAULT_SERVICE_PLUGINS = {
'flavors': 'flavors',
'revisions': 'revisions',
}
# Service operation status constants
ACTIVE = "ACTIVE"
DOWN = "DOWN"
CREATED = "CREATED"
PENDING_CREATE = "PENDING_CREATE"
PENDING_UPDATE = "PENDING_UPDATE"
PENDING_DELETE = "PENDING_DELETE"
INACTIVE = "INACTIVE"
ERROR = "ERROR"
ACTIVE_PENDING_STATUSES = (
ACTIVE,
PENDING_CREATE,
PENDING_UPDATE
)
# Network Type constants
TYPE_FLAT = 'flat'
TYPE_GENEVE = 'geneve'
TYPE_GRE = 'gre'
TYPE_LOCAL = 'local'
TYPE_VXLAN = 'vxlan'
TYPE_VLAN = 'vlan'
TYPE_NONE = 'none'
# Values for network_type
# For VLAN Network
MIN_VLAN_TAG = 1
MAX_VLAN_TAG = 4094
# For Geneve Tunnel
MIN_GENEVE_VNI = 1
MAX_GENEVE_VNI = 2 ** 24 - 1
# For GRE Tunnel
MIN_GRE_ID = 1
MAX_GRE_ID = 2 ** 32 - 1
# For VXLAN Tunnel
MIN_VXLAN_VNI = 1
MAX_VXLAN_VNI = 2 ** 24 - 1
VXLAN_UDP_PORT = 4789
# Overlay (tunnel) protocol overhead
GENEVE_ENCAP_MIN_OVERHEAD = 30
GRE_ENCAP_OVERHEAD = 22
VXLAN_ENCAP_OVERHEAD = 30
# IP header length
IP_HEADER_LENGTH = {
4: 20,
6: 40,
}

@ -35,7 +35,7 @@ import webob.exc
from neutron._i18n import _
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.plugins.common import constants as p_const
INTERFACE_HASH_LEN = 6
LOG = logging.getLogger(__name__)
@ -53,26 +53,26 @@ def get_deployment_physnet_mtu():
def is_valid_vlan_tag(vlan):
return p_const.MIN_VLAN_TAG <= vlan <= p_const.MAX_VLAN_TAG
return n_const.MIN_VLAN_TAG <= vlan <= n_const.MAX_VLAN_TAG
def is_valid_gre_id(gre_id):
return p_const.MIN_GRE_ID <= gre_id <= p_const.MAX_GRE_ID
return n_const.MIN_GRE_ID <= gre_id <= n_const.MAX_GRE_ID
def is_valid_vxlan_vni(vni):
return p_const.MIN_VXLAN_VNI <= vni <= p_const.MAX_VXLAN_VNI
return n_const.MIN_VXLAN_VNI <= vni <= n_const.MAX_VXLAN_VNI
def is_valid_geneve_vni(vni):
return p_const.MIN_GENEVE_VNI <= vni <= p_const.MAX_GENEVE_VNI
return n_const.MIN_GENEVE_VNI <= vni <= n_const.MAX_GENEVE_VNI
def verify_tunnel_range(tunnel_range, tunnel_type):
"""Raise an exception for invalid tunnel range or malformed range."""
mappings = {p_const.TYPE_GRE: is_valid_gre_id,
p_const.TYPE_VXLAN: is_valid_vxlan_vni,
p_const.TYPE_GENEVE: is_valid_geneve_vni}
mappings = {n_const.TYPE_GRE: is_valid_gre_id,
n_const.TYPE_VXLAN: is_valid_vxlan_vni,
n_const.TYPE_GENEVE: is_valid_geneve_vni}
if tunnel_type in mappings:
for ident in tunnel_range:
if not mappings[tunnel_type](ident):
@ -147,9 +147,9 @@ def parse_network_vlan_ranges(network_vlan_ranges_cfg_entries):
def in_pending_status(status):
return status in (p_const.PENDING_CREATE,
p_const.PENDING_UPDATE,
p_const.PENDING_DELETE)
return status in (n_const.PENDING_CREATE,
n_const.PENDING_UPDATE,
n_const.PENDING_DELETE)
def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):

@ -39,7 +39,6 @@ from neutron.common import exceptions
from neutron.common import profiler as setup_profiler
from neutron.common import topics
from neutron.common import utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
@ -197,7 +196,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
return lb_utils.get_tap_device_name(interface_id)
def get_vxlan_device_name(self, segmentation_id):
if 0 <= int(segmentation_id) <= p_const.MAX_VXLAN_VNI:
if 0 <= int(segmentation_id) <= constants.MAX_VXLAN_VNI:
return VXLAN_INTERFACE_PREFIX + str(segmentation_id)
else:
LOG.warning("Invalid Segmentation ID: %s, will lead to "
@ -450,7 +449,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
network_type,
physical_network,
segmentation_id):
if network_type == p_const.TYPE_VXLAN:
if network_type == constants.TYPE_VXLAN:
if self.vxlan_mode == lconst.VXLAN_NONE:
LOG.error("Unable to add vxlan interface for network %s",
network_id)
@ -465,10 +464,10 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
" for physical network %s",
physical_network)
return
if network_type == p_const.TYPE_FLAT:
if network_type == constants.TYPE_FLAT:
return self.ensure_flat_bridge(network_id, physical_bridge,
physical_interface)
elif network_type == p_const.TYPE_VLAN:
elif network_type == constants.TYPE_VLAN:
return self.ensure_vlan_bridge(network_id, physical_bridge,
physical_interface,
segmentation_id)
@ -509,7 +508,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
if not bridge_name:
bridge_name = self.get_bridge_name(network_id)
if network_type == p_const.TYPE_LOCAL:
if network_type == constants.TYPE_LOCAL:
self.ensure_local_bridge(network_id, bridge_name)
elif not self.ensure_physical_in_bridge(network_id,
network_type,
@ -666,7 +665,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
return False
test_iface = None
for seg_id in moves.range(1, p_const.MAX_VXLAN_VNI + 1):
for seg_id in moves.range(1, constants.MAX_VXLAN_VNI + 1):
if (ip_lib.device_exists(self.get_vxlan_device_name(seg_id))
or ip_lib.vxlan_in_use(seg_id)):
continue
@ -781,7 +780,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
}
if self.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
configurations['tunnel_types'] = [constants.TYPE_VXLAN]
configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
return configurations
@ -874,7 +873,7 @@ class LinuxBridgeRpcCallbacks(
if not segment:
return
if segment.network_type != p_const.TYPE_VXLAN:
if segment.network_type != constants.TYPE_VXLAN:
return
interface = self.agent.mgr.get_vxlan_device_name(
@ -896,7 +895,7 @@ class LinuxBridgeRpcCallbacks(
if not segment:
return
if segment.network_type != p_const.TYPE_VXLAN:
if segment.network_type != constants.TYPE_VXLAN:
return
interface = self.agent.mgr.get_vxlan_device_name(
@ -918,7 +917,7 @@ class LinuxBridgeRpcCallbacks(
if not segment:
return
if segment.network_type != p_const.TYPE_VXLAN:
if segment.network_type != constants.TYPE_VXLAN:
return
interface = self.agent.mgr.get_vxlan_device_name(

@ -17,7 +17,6 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron.agent import securitygroups_rpc
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
from neutron.services.qos.drivers.linuxbridge import driver as lb_qos_driver
@ -42,8 +41,8 @@ class LinuxbridgeMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
def get_allowed_network_types(self, agent):
return (agent['configurations'].get('tunnel_types', []) +
[p_constants.TYPE_LOCAL, p_constants.TYPE_FLAT,
p_constants.TYPE_VLAN])
[constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN])
def get_mappings(self, agent):
mappings = dict(agent['configurations'].get('interface_mappings', {}),

@ -29,7 +29,6 @@ from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import topics
from neutron.conf.plugins.ml2.drivers import macvtap as config
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.plugins.ml2.drivers.macvtap import macvtap_common
@ -61,7 +60,7 @@ class MacvtapRPCCallBack(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
return
segment = self.network_map.get(network_id)
if segment and segment.network_type == p_constants.TYPE_VLAN:
if segment and segment.network_type == constants.TYPE_VLAN:
if_mappings = self.agent.mgr.interface_mappings
vlan_device_name = macvtap_common.get_vlan_device_name(
if_mappings[segment.physical_network],

@ -19,7 +19,6 @@ from neutron_lib import constants
from neutron_lib.plugins.ml2 import api
from oslo_log import log
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers.macvtap import macvtap_common
from neutron.plugins.ml2.drivers import mech_agent
@ -45,7 +44,7 @@ class MacvtapMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
{portbindings.CAP_PORT_FILTER: False})
def get_allowed_network_types(self, agent):
return [p_constants.TYPE_FLAT, p_constants.TYPE_VLAN]
return [constants.TYPE_FLAT, constants.TYPE_VLAN]
def get_mappings(self, agent):
return agent['configurations'].get('interface_mappings', {})
@ -80,7 +79,7 @@ class MacvtapMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
interface = mappings[segment['physical_network']]
network_type = segment[api.NETWORK_TYPE]
if network_type == p_constants.TYPE_VLAN:
if network_type == constants.TYPE_VLAN:
vlan_id = segment[api.SEGMENTATION_ID]
macvtap_src = macvtap_common.get_vlan_device_name(interface,
vlan_id)

@ -23,7 +23,6 @@ from oslo_log import log
import six
from neutron.db import provisioning_blocks
from neutron.plugins.common import constants as p_constants
LOG = log.getLogger(__name__)
@ -249,7 +248,7 @@ class SimpleAgentMechanismDriverBase(AgentMechanismDriverBase):
'allowed_network_types': allowed_network_types})
return False
if network_type in [p_constants.TYPE_FLAT, p_constants.TYPE_VLAN]:
if network_type in [const.TYPE_FLAT, const.TYPE_VLAN]:
physnet = segment[api.PHYSICAL_NETWORK]
if not self.physnet_in_mappings(physnet, mappings):
LOG.debug(

@ -18,7 +18,6 @@ from neutron_lib import constants
from neutron_lib.plugins.ml2 import api
from oslo_log import log
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import mech_agent
from neutron.plugins.ml2.drivers.mech_sriov.mech_driver \
import exceptions as exc
@ -66,7 +65,7 @@ class SriovNicSwitchMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
sriov_qos_driver.register()
def get_allowed_network_types(self, agent):
return (p_const.TYPE_FLAT, p_const.TYPE_VLAN)
return (constants.TYPE_FLAT, constants.TYPE_VLAN)
def get_mappings(self, agent):
return agent['configurations'].get('device_mappings', {})
@ -157,9 +156,9 @@ class SriovNicSwitchMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
def _get_vif_details(self, segment):
network_type = segment[api.NETWORK_TYPE]
if network_type == p_const.TYPE_FLAT:
if network_type == constants.TYPE_FLAT:
vlan_id = FLAT_VLAN
elif network_type == p_const.TYPE_VLAN:
elif network_type == constants.TYPE_VLAN:
vlan_id = segment[api.SEGMENTATION_ID]
else:
raise exc.SriovUnsupportedNetworkType(net_type=network_type)

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron.plugins.common import constants as p_const
from neutron_lib import constants as p_const
# Special vlan_id value in ovs_vlan_allocations table indicating flat network

@ -21,12 +21,12 @@
import netaddr
from neutron_lib import constants as p_const
from oslo_log import log as logging
from ryu.lib.packet import ether_types
from ryu.lib.packet import icmpv6
from ryu.lib.packet import in_proto
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.native \
import ovs_bridge

@ -24,7 +24,6 @@ import netaddr
from neutron_lib import constants as const
from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import ovs_bridge
@ -71,7 +70,7 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge):
@staticmethod
def _dvr_to_src_mac_table_id(network_type):
if network_type == p_const.TYPE_VLAN:
if network_type == const.TYPE_VLAN:
return constants.DVR_TO_SRC_MAC_VLAN
else:
return constants.DVR_TO_SRC_MAC

@ -24,7 +24,6 @@ from osprofiler import profiler
from neutron.agent.common import ovs_lib
from neutron.common import utils as n_utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
LOG = logging.getLogger(__name__)
@ -374,7 +373,7 @@ class OVSDVRNeutronAgent(object):
ldm.set_dvr_owned(True)
vlan_to_use = lvm.vlan
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
subnet_info = ldm.get_subnet_info()
@ -413,7 +412,7 @@ class OVSDVRNeutronAgent(object):
dst_mac=comp_ovsport.get_mac(),
dst_port=comp_ovsport.get_ofport())
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
# TODO(vivek) remove the IPv6 related flows once SNAT is not
# used for IPv6 DVR.
br = self.phys_brs[lvm.physical_network]
@ -474,7 +473,7 @@ class OVSDVRNeutronAgent(object):
ovsport.add_subnet(subnet_uuid)
self.local_ports[port.vif_id] = ovsport
vlan_to_use = lvm.vlan
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
# create a rule for this vm port
self.int_br.install_dvr_to_src_mac(
@ -534,7 +533,7 @@ class OVSDVRNeutronAgent(object):
ovsport.add_subnet(subnet_uuid)
self.local_ports[port.vif_id] = ovsport
vlan_to_use = lvm.vlan
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
self.int_br.install_dvr_to_src_mac(
network_type=lvm.network_type,
@ -549,7 +548,7 @@ class OVSDVRNeutronAgent(object):
return
if local_vlan_map.network_type not in (constants.TUNNEL_NETWORK_TYPES
+ [p_const.TYPE_VLAN]):
+ [n_const.TYPE_VLAN]):
LOG.debug("DVR: Port %s is with network_type %s not supported"
" for dvr plumbing", port.vif_id,
local_vlan_map.network_type)
@ -586,7 +585,7 @@ class OVSDVRNeutronAgent(object):
network_type = lvm.network_type
physical_network = lvm.physical_network
vlan_to_use = lvm.vlan
if network_type == p_const.TYPE_VLAN:
if network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
# ensure we process for all the subnets laid on this removed port
for sub_uuid in subnet_set:
@ -612,7 +611,7 @@ class OVSDVRNeutronAgent(object):
# this subnet from local_dvr_map, as no dvr (or) csnat
# ports available on this agent anymore
self.local_dvr_map.pop(sub_uuid, None)
if network_type == p_const.TYPE_VLAN:
if network_type == n_const.TYPE_VLAN:
br = self.phys_brs[physical_network]
if network_type in constants.TUNNEL_NETWORK_TYPES:
br = self.tun_br
@ -624,7 +623,7 @@ class OVSDVRNeutronAgent(object):
vlan_tag=lvm.vlan, gateway_mac=subnet_info['gateway_mac'])
ovsport.remove_subnet(sub_uuid)
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
br = self.phys_brs[physical_network]
if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:
br = self.tun_br
@ -646,7 +645,7 @@ class OVSDVRNeutronAgent(object):
ldm = self.local_dvr_map[sub_uuid]
ldm.remove_compute_ofport(port.vif_id)
vlan_to_use = lvm.vlan
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
# first remove this vm port rule
self.int_br.delete_dvr_to_src_mac(
@ -667,7 +666,7 @@ class OVSDVRNeutronAgent(object):
ldm = self.local_dvr_map[sub_uuid]
ldm.set_csnat_ofport(constants.OFPORT_INVALID)
vlan_to_use = lvm.vlan
if lvm.network_type == p_const.TYPE_VLAN:
if lvm.network_type == n_const.TYPE_VLAN:
vlan_to_use = lvm.segmentation_id
# then remove csnat port rule
self.int_br.delete_dvr_to_src_mac(

@ -55,7 +55,6 @@ from neutron.common import constants as c_const
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron.conf.agent import xenapi_conf
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
@ -148,8 +147,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.use_veth_interconnection = ovs_conf.use_veth_interconnection
self.veth_mtu = agent_conf.veth_mtu
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
p_const.MAX_VLAN_TAG + 1))
self.available_local_vlans = set(moves.range(n_const.MIN_VLAN_TAG,
n_const.MAX_VLAN_TAG + 1))
self.tunnel_types = agent_conf.tunnel_types or []
self.l2_pop = agent_conf.l2_population
# TODO(ethuleau): Change ARP responder so it's not dependent on the
@ -359,9 +358,9 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self._local_vlan_hints = {}
def _reset_tunnel_ofports(self):
self.tun_br_ofports = {p_const.TYPE_GENEVE: {},
p_const.TYPE_GRE: {},
p_const.TYPE_VXLAN: {}}
self.tun_br_ofports = {n_const.TYPE_GENEVE: {},
n_const.TYPE_GRE: {},
n_const.TYPE_VXLAN: {}}
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
@ -676,7 +675,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
"net-id=%(net_uuid)s - tunneling disabled",
{'network_type': network_type,
'net_uuid': net_uuid})
elif network_type == p_const.TYPE_FLAT:
elif network_type == n_const.TYPE_FLAT:
if physical_network in self.phys_brs:
self._local_vlan_for_flat(lvid, physical_network)
else:
@ -685,7 +684,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
"physical_network %(physical_network)s",
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_VLAN:
elif network_type == n_const.TYPE_VLAN:
if physical_network in self.phys_brs:
self._local_vlan_for_vlan(lvid, physical_network,
segmentation_id)
@ -695,7 +694,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
"physical_network %(physical_network)s",
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_LOCAL:
elif network_type == n_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
@ -732,7 +731,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
for ofport in lvm.tun_ofports:
self.cleanup_tunnel_port(self.tun_br, ofport,
lvm.network_type)
elif lvm.network_type == p_const.TYPE_FLAT:
elif lvm.network_type == n_const.TYPE_FLAT:
if lvm.physical_network in self.phys_brs:
# outbound
br = self.phys_brs[lvm.physical_network]
@ -744,7 +743,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
br.reclaim_local_vlan(
port=self.int_ofports[lvm.physical_network],
segmentation_id=None)
elif lvm.network_type == p_const.TYPE_VLAN:
elif lvm.network_type == n_const.TYPE_VLAN:
if lvm.physical_network in self.phys_brs:
# outbound
br = self.phys_brs[lvm.physical_network]
@ -756,7 +755,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
br.reclaim_local_vlan(
port=self.int_ofports[lvm.physical_network],
segmentation_id=lvm.segmentation_id)
elif lvm.network_type == p_const.TYPE_LOCAL:
elif lvm.network_type == n_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:

@ -23,7 +23,6 @@ from oslo_config import cfg
from oslo_log import log
from neutron.agent import securitygroups_rpc
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as a_const
@ -66,8 +65,8 @@ class OpenvswitchMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
def get_allowed_network_types(self, agent):
return (agent['configurations'].get('tunnel_types', []) +
[p_constants.TYPE_LOCAL, p_constants.TYPE_FLAT,
p_constants.TYPE_VLAN])
[constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN])
def get_mappings(self, agent):
return agent['configurations'].get('bridge_mappings', {})

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as exc
from neutron_lib.objects import exceptions as obj_base
from neutron_lib.plugins.ml2 import api
@ -24,7 +25,6 @@ from neutron.common import exceptions as n_exc
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import flatallocation as flat_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import helpers
LOG = log.getLogger(__name__)

@ -14,13 +14,13 @@
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.objects.plugins.ml2 import geneveallocation as geneve_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__)

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.objects.plugins.ml2 import greallocation as gre_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__)

@ -13,12 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from oslo_log import log
from neutron._i18n import _
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api
LOG = log.getLogger(__name__)

@ -17,6 +17,7 @@ import itertools
import operator
import netaddr
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
@ -31,7 +32,6 @@ from neutron._i18n import _
from neutron.common import topics
from neutron.db import api as db_api
from neutron.objects import base as base_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import helpers

@ -15,6 +15,7 @@
import sys
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
@ -26,7 +27,6 @@ from neutron._i18n import _
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlanalloc
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import helpers

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.objects.plugins.ml2 import vxlanallocation as vxlan_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__)

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.plugins import constants
from neutron_lib.services import base as service_base
from neutron.db import flavors_db
from neutron.plugins.common import constants
class FlavorsPlugin(service_base.ServicePluginBase,

@ -24,7 +24,6 @@ from oslo_log import log as logging
from neutron._i18n import _
from neutron.db import servicetype_db as st_db
from neutron.plugins.common import constants
from neutron.services import provider_configuration
from neutron.services import service_base
@ -60,7 +59,8 @@ class DriverController(object):
@property
def _flavor_plugin(self):
if not hasattr(self, '_flavor_plugin_ref'):
self._flavor_plugin_ref = directory.get_plugin(constants.FLAVORS)
self._flavor_plugin_ref = directory.get_plugin(
plugin_constants.FLAVORS)
return self._flavor_plugin_ref
@registry.receives(resources.ROUTER, [events.BEFORE_CREATE])

@ -31,7 +31,6 @@ from neutron.common import utils
from neutron.conf.agent import common as agent_config
from neutron.conf import common as common_config
from neutron.conf.plugins.ml2.drivers import ovs_conf
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_int
@ -100,7 +99,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def create_agent(self, create_tunnels=True, ancillary_bridge=None,
local_ip='192.168.10.1'):
if create_tunnels:
tunnel_types = [p_const.TYPE_VXLAN]
tunnel_types = [n_const.TYPE_VXLAN]
else:
tunnel_types = None
bridge_mappings = ['physnet:%s' % self.br_phys]

@ -32,6 +32,8 @@ from neutron import policy
from neutron.tests.common import helpers
from neutron.tests.functional.pecan_wsgi import test_functional
from neutron.tests.functional.pecan_wsgi import utils as pecan_utils
from neutron.tests.unit import dummy_plugin
_SERVICE_PLUGIN_RESOURCE = 'serviceplugin'
_SERVICE_PLUGIN_COLLECTION = _SERVICE_PLUGIN_RESOURCE + 's'
@ -787,7 +789,7 @@ class TestRequestProcessing(TestRootController):
def test_service_plugin_uri(self):
nm = manager.NeutronManager.get_instance()
nm.path_prefix_resource_mappings['dummy'] = [
nm.path_prefix_resource_mappings[dummy_plugin.RESOURCE_NAME] = [
_SERVICE_PLUGIN_COLLECTION]
response = self.do_request('/v2.0/dummy/serviceplugins.json')
self.assertEqual(200, response.status_int)

@ -33,7 +33,6 @@ import unittest2
from neutron.api.v2 import attributes
from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const
from neutron.services.logapi.common import constants as log_const
@ -231,7 +230,7 @@ def get_random_port(start=n_const.PORT_RANGE_MIN):
def get_random_vlan():
return random.randint(p_const.MIN_VLAN_TAG, p_const.MAX_VLAN_TAG)
return random.randint(constants.MIN_VLAN_TAG, constants.MAX_VLAN_TAG)
def get_random_ip_version():

@ -21,6 +21,7 @@ from six.moves import http_client as httplib
from webob import exc
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit import dummy_plugin
class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
@ -129,7 +130,8 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self._test_update_port_binding_profile({})
def test_port_create_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
profile_arg = {portbindings.PROFILE: {dummy_plugin.RESOURCE_NAME:
dummy_plugin.RESOURCE_NAME}}
with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
# succeed without binding:profile
@ -148,7 +150,8 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
pass
def test_port_update_portinfo_non_admin(self):
profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
profile_arg = {portbindings.PROFILE: {dummy_plugin.RESOURCE_NAME:
dummy_plugin.RESOURCE_NAME}}
with self.network() as net1:
with self.subnet(network=net1) as subnet1:
with self.port(subnet=subnet1) as port:

@ -15,6 +15,7 @@
import collections
import mock
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
@ -24,7 +25,6 @@ import testtools
from neutron.agent.common import ovs_lib
from neutron.agent.common import utils
from neutron.conf.agent import common as config
from neutron.plugins.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as p_const
from neutron.tests import base

@ -39,6 +39,7 @@ from neutron.plugins.common import constants
from neutron import quota
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base
from neutron.tests.unit import dummy_plugin
from neutron.tests.unit import extension_stubs as ext_stubs
import neutron.tests.unit.extensions
from neutron.tests.unit.extensions import extendedattribute as extattr
@ -160,7 +161,7 @@ class ResourceExtensionTest(base.BaseTestCase):
class DummySvcPlugin(wsgi.Controller):
@classmethod
def get_plugin_type(cls):
return constants.DUMMY
return dummy_plugin.DUMMY_SERVICE_TYPE
def index(self, request, **kwargs):
return "resource index"
@ -209,7 +210,7 @@ class ResourceExtensionTest(base.BaseTestCase):
@classmethod
def get_plugin_type(cls):
return constants.DUMMY
return dummy_plugin.DUMMY_SERVICE_TYPE
res_ext = extensions.ResourceExtension(
'tweedles', DummySvcPlugin(), path_prefix="/dummy_svc")
@ -781,7 +782,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
return None
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1"])
plugin_info = {constants.DUMMY: stub_plugin}
plugin_info = {dummy_plugin.DUMMY_SERVICE_TYPE: stub_plugin}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)

@ -43,6 +43,7 @@ from neutron.quota import resource_registry
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests import tools
from neutron.tests.unit import dummy_plugin
from neutron.tests.unit import testlib_api
@ -1134,7 +1135,7 @@ class SubresourceTest(base.BaseTestCase):
SUB_RESOURCES = {}
RESOURCE_ATTRIBUTE_MAP = {}
SUB_RESOURCES['dummy'] = {
SUB_RESOURCES[dummy_plugin.RESOURCE_NAME] = {
'collection_name': 'dummies',
'parent': {'collection_name': 'networks',
'member_name': 'network'}
@ -1148,9 +1149,10 @@ class SubresourceTest(base.BaseTestCase):
'required_by_policy': True,
'is_visible': True}
}
collection_name = SUB_RESOURCES['dummy'].get('collection_name')
resource_name = 'dummy'
parent = SUB_RESOURCES['dummy'].get('parent')
collection_name = SUB_RESOURCES[
dummy_plugin.RESOURCE_NAME].get('collection_name')
resource_name = dummy_plugin.RESOURCE_NAME
parent = SUB_RESOURCES[dummy_plugin.RESOURCE_NAME].get('parent')
params = RESOURCE_ATTRIBUTE_MAP['dummies']
member_actions = {'mactions': 'GET'}
_plugin = directory.get_plugin()
@ -1197,8 +1199,12 @@ class SubresourceTest(base.BaseTestCase):
instance = self.plugin.return_value
tenant_id = _uuid()
body = {'dummy': {'foo': 'bar', 'tenant_id': tenant_id,
'project_id': tenant_id}}
body = {
dummy_plugin.RESOURCE_NAME: {
'foo': 'bar', 'tenant_id': tenant_id,
'project_id': tenant_id
}
}
self.api.post_json('/networks/id1/dummies', body)
instance.create_network_dummy.assert_called_once_with(mock.ANY,
network_id='id1',
@ -1208,7 +1214,7 @@ class SubresourceTest(base.BaseTestCase):
instance = self.plugin.return_value
dummy_id = _uuid()
body = {'dummy': {'foo': 'bar'}}
body = {dummy_plugin.RESOURCE_NAME: {'foo': 'bar'}}
self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
body)
instance.update_network_dummy.assert_called_once_with(mock.ANY,
@ -1220,7 +1226,7 @@ class SubresourceTest(base.BaseTestCase):
instance = self.plugin.return_value
dummy_id = _uuid()
body = {'dummy': {}}
body = {dummy_plugin.RESOURCE_NAME: {}}
self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
body)
instance.update_network_dummy.assert_called_once_with(mock.ANY,

@ -30,7 +30,6 @@ import testtools
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.tests import base
from neutron.tests.unit import tests
@ -145,16 +144,16 @@ class TestParseTunnelRangesMixin(object):
class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
TUN_MIN = p_const.MIN_GRE_ID
TUN_MAX = p_const.MAX_GRE_ID
TYPE = p_const.TYPE_GRE
TUN_MIN = constants.MIN_GRE_ID
TUN_MAX = constants.MAX_GRE_ID
TYPE = constants.TYPE_GRE
class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
TUN_MIN = p_const.MIN_VXLAN_VNI
TUN_MAX = p_const.MAX_VXLAN_VNI
TYPE = p_const.TYPE_VXLAN
TUN_MIN = constants.MIN_VXLAN_VNI
TUN_MAX = constants.MAX_VXLAN_VNI
TYPE = constants.TYPE_VXLAN
class UtilTestParseVlanRanges(base.BaseTestCase):

@ -15,13 +15,13 @@
import mock
from neutron_lib import context
from neutron_lib.plugins import constants as plugin_constants
from oslo_utils import uuidutils
from neutron.common import exceptions as n_exc
from neutron.core_extensions import base as base_core
from neutron.core_extensions import qos as qos_core
from neutron.objects.qos import policy
from neutron.plugins.common import constants as plugin_constants
from neutron.services.qos import qos_consts
from neutron.tests import base

@ -17,6 +17,7 @@ import contextlib
from neutron_lib import constants as n_consts
from neutron_lib import context
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import constants
from oslo_utils import uuidutils
import webob.exc
@ -24,7 +25,6 @@ from neutron.api import extensions
from neutron.common import config
import neutron.extensions
from neutron.extensions import metering
from neutron.plugins.common import constants
from neutron.services.metering import metering_plugin
from neutron.tests.unit.db import test_db_base_plugin_v2

@ -22,11 +22,11 @@ from neutron.api import extensions
from neutron.api.v2 import base
from neutron.db import servicetype_db
from neutron.extensions import servicetype
from neutron.plugins.common import constants
RESOURCE_NAME = "dummy"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
DUMMY_SERVICE_TYPE = "DUMMY"
# Attribute Map for dummy resource
RESOURCE_ATTRIBUTE_MAP = {
@ -53,11 +53,11 @@ class Dummy(object):
@classmethod
def get_name(cls):
return "dummy"
return RESOURCE_NAME
@classmethod
def get_alias(cls):
return "dummy"
return RESOURCE_NAME
@classmethod
def get_description(cls):
@ -70,7 +70,7 @@ class Dummy(object):
@classmethod
def get_resources(cls):
"""Returns Extended Resource for dummy management."""
dummy_inst = directory.get_plugin('DUMMY')
dummy_inst = directory.get_plugin(DUMMY_SERVICE_TYPE)
controller = base.create_resource(
COLLECTION_NAME, RESOURCE_NAME, dummy_inst,
RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME])
@ -86,9 +86,9 @@ class DummyServicePlugin(service_base.ServicePluginBase):
or VPN will adopt a similar solution.
"""
supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
supported_extension_aliases = [RESOURCE_NAME, servicetype.EXT_ALIAS]
path_prefix = "/dummy_svc"
agent_notifiers = {'dummy': 'dummy_agent_notifier'}
agent_notifiers = {RESOURCE_NAME: 'dummy_agent_notifier'}
def __init__(self):
self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
@ -96,7 +96,7 @@ class DummyServicePlugin(service_base.ServicePluginBase):
@classmethod
def get_plugin_type(cls):
return constants.DUMMY
return DUMMY_SERVICE_TYPE
def get_plugin_description(self):
return "Neutron Dummy Service Plugin"
@ -111,7 +111,7 @@ class DummyServicePlugin(service_base.ServicePluginBase):
raise exceptions.NotFound()
def create_dummy(self, context, dummy):
d = dummy['dummy']
d = dummy[RESOURCE_NAME]
d['id'] = uuidutils.generate_uuid()
self.dummys[d['id']] = d
self.svctype_mgr.increase_service_type_refcount(context,

@ -19,6 +19,7 @@ import fixtures
import mock
from neutron_lib import context
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import constants
from oslo_config import cfg
from oslo_utils import uuidutils
from webob import exc
@ -28,12 +29,12 @@ from neutron.db.models import l3 as l3_models
from neutron.db import servicetype_db
from neutron.extensions import flavors
from neutron.objects import flavor as flavor_obj
from neutron.plugins.common import constants
from neutron.services.flavors import flavors_plugin
from neutron.services import provider_configuration as provconf
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit import dummy_plugin
from neutron.tests.unit.extensions import base as extension
_uuid = uuidutils.generate_uuid
@ -41,7 +42,7 @@ _get_path = test_base._get_path
_driver = ('neutron.tests.unit.extensions.test_flavors.'
'DummyServiceDriver')
_provider = 'dummy'
_provider = dummy_plugin.RESOURCE_NAME
_long_name = 'x' * (db_const.NAME_FIELD_SIZE + 1)
_long_description = 'x' * (db_const.LONG_DESCRIPTION_FIELD_SIZE + 1)
@ -416,7 +417,7 @@ class DummyServicePlugin(object):
@classmethod
def get_plugin_type(cls):
return constants.DUMMY
return dummy_plugin.DUMMY_SERVICE_TYPE
def get_plugin_description(self):
return "Dummy service plugin, aware of flavors"
@ -426,7 +427,7 @@ class DummyServiceDriver(object):
@staticmethod
def get_service_type():
return constants.DUMMY
return dummy_plugin.DUMMY_SERVICE_TYPE
def __init__(self, plugin):
pass
@ -462,7 +463,7 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
def _create_flavor(self, description=None):
flavor = {'flavor': {'name': 'GOLD',
'service_type': constants.DUMMY,
'service_type': dummy_plugin.DUMMY_SERVICE_TYPE,
'description': description or 'the best flavor',
'enabled': True}}
return self.plugin.create_flavor(self.ctx, flavor), flavor
@ -472,7 +473,8 @@ class FlavorPluginTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
res = flavor_obj.Flavor.get_objects(self.ctx)
self.assertEqual(1, len(res))
self.assertEqual('GOLD', res[0]['name'])
self.assertEqual(constants.DUMMY, res[0]['service_type'])
self.assertEqual(
dummy_plugin.DUMMY_SERVICE_TYPE, res[0]['service_type'])
def test_update_flavor(self):
fl, flavor = self._create_flavor()

@ -43,7 +43,6 @@ from neutron.extensions import ip_allocation
from neutron.extensions import l2_adjacency
from neutron.extensions import segment as ext_segment
from neutron.objects import network
from neutron.plugins.common import constants as p_constants
from neutron.services.segments import db
from neutron.services.segments import exceptions as segment_exc
from neutron.services.segments import placement_client
@ -507,7 +506,7 @@ class HostSegmentMappingTestCase(SegmentTestCase):
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
segmentation_id=200, network_type=constants.TYPE_VLAN)['segment']
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
@ -533,10 +532,10 @@ class TestMl2HostSegmentMappingNoAgent(HostSegmentMappingTestCase):
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network='phys_net1',
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
segmentation_id=200, network_type=constants.TYPE_VLAN)['segment']
self._test_create_segment(
network_id=network['id'], physical_network='phys_net2',
segmentation_id=201, network_type=p_constants.TYPE_VLAN)['segment']
segmentation_id=201, network_type=constants.TYPE_VLAN)['segment']
segments = db.get_segments_with_phys_nets(ctx, physnets)
segment_ids = {segment['id'] for segment in segments}
db.update_segment_host_mapping(ctx, host, segment_ids)
@ -553,7 +552,7 @@ class TestMl2HostSegmentMappingNoAgent(HostSegmentMappingTestCase):
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network='phys_net1',
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
segmentation_id=200, network_type=constants.TYPE_VLAN)['segment']
db.map_segment_to_hosts(ctx, segment['id'], hosts)
updated_segment = self.plugin.get_segment(ctx, segment['id'])
self.assertEqual(hosts, set(updated_segment['hosts']))
@ -567,7 +566,7 @@ class TestMl2HostSegmentMappingNoAgent(HostSegmentMappingTestCase):
host = "host%s" % i
segment = self._test_create_segment(
network_id=network_id, physical_network='phys_net%s' % i,
segmentation_id=200 + i, network_type=p_constants.TYPE_VLAN)
segmentation_id=200 + i, network_type=constants.TYPE_VLAN)
db.update_segment_host_mapping(
ctx, host, {segment['segment']['id']})
hosts.add(host)
@ -597,7 +596,7 @@ class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
network_id=networks[i]['id'],
physical_network=physical_networks[i],
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)['segment'])
network_type=constants.TYPE_VLAN)['segment'])
self._register_agent(host, mappings={physical_networks[0]: 'br-eth-1',
physical_networks[1]: 'br-eth-2'},
plugin=self.plugin)
@ -638,7 +637,7 @@ class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
network_id=network['id'],
physical_network=physical_network,
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)['segment']
network_type=constants.TYPE_VLAN)['segment']
self._register_agent(host1, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
self._register_agent(host2, mappings={physical_network: 'br-eth-1'},
@ -650,7 +649,7 @@ class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
network_id=network['id'],
physical_network=other_phys_net,
segmentation_id=201,
network_type=p_constants.TYPE_VLAN)['segment']
network_type=constants.TYPE_VLAN)['segment']
self._register_agent(host2, mappings={other_phys_net: 'br-eth-2'},
plugin=self.plugin)
# We should have segment1 map to host1 and segment2 map to host2 now
@ -673,7 +672,7 @@ class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
network = network['network']
segment2 = self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=201, network_type=p_constants.TYPE_VLAN)['segment']
segmentation_id=201, network_type=constants.TYPE_VLAN)['segment']
segments_host_db = self._get_segments_for_host(host1)
self.assertEqual(set((segment['id'], segment2['id'])),
set(segments_host_db))
@ -693,7 +692,7 @@ class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
network = network['network']
self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=200, network_type=p_constants.TYPE_VLAN)
segmentation_id=200, network_type=constants.TYPE_VLAN)
self._register_agent(host, plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertFalse(segments_host_db)
@ -753,7 +752,7 @@ class TestHostSegmentMappingNoSupportFromPlugin(HostSegmentMappingTestCase):
self._test_create_segment(network_id=network['id'],
physical_network=physical_network,
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
@ -831,7 +830,7 @@ class SegmentAwareIpamTestCase(SegmentTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network=physnet,
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
return network, segment
def _create_test_subnet_with_segment(self, network, segment,
@ -926,7 +925,7 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet',
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])
@ -950,7 +949,7 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet',
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
self._validate_l2_adjacency(network['network']['id'], is_adjacent=True)
@ -1021,7 +1020,7 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet',
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
# Create a port with no IP address (since there is no subnet)
port = self._create_deferred_ip_port(network)
@ -1055,7 +1054,7 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet',
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
with self.subnet(network=network,
segment_id=segment['segment']['id']):
pass
@ -1115,7 +1114,7 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet',
network_type=p_constants.TYPE_VLAN)
network_type=constants.TYPE_VLAN)
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])

@ -16,6 +16,7 @@
import mock
from neutron_lib import context
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants
from oslo_config import cfg
from oslo_utils import uuidutils
import webob.exc as webexc
@ -25,7 +26,6 @@ from neutron.api import extensions
from neutron.db.models import servicetype as st_model
from neutron.db import servicetype_db as st_db
from neutron.extensions import servicetype
from neutron.plugins.common import constants
from neutron.services import provider_configuration as provconf
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.api.v2 import test_base
@ -98,7 +98,7 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
def test_get_default_provider(self):
self._set_override([constants.LOADBALANCER +
':lbaas1:driver_path:default',
constants.DUMMY +
dp.DUMMY_SERVICE_TYPE +
':lbaas2:driver_path2'])
# can pass None as a context
p = self.manager.get_default_service_provider(None,
@ -111,12 +111,14 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
self.assertRaises(
provconf.DefaultServiceProviderNotFound,
self.manager.get_default_service_provider,
None, constants.DUMMY
None, dp.DUMMY_SERVICE_TYPE
)
def test_get_provider_names_by_resource_ids(self):
self._set_override([constants.DUMMY + ':dummy1:driver_path',
constants.DUMMY + ':dummy2:driver_path2'])
self._set_override([dp.DUMMY_SERVICE_TYPE +
':dummy1:driver_path',
dp.DUMMY_SERVICE_TYPE +
':dummy2:driver_path2'])
ctx = context.get_admin_context()
test_data = [{'provider_name': 'dummy1',
'resource_id': uuidutils.generate_uuid()},
@ -124,11 +126,14 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
'resource_id': uuidutils.generate_uuid()},
{'provider_name': 'dummy2',
'resource_id': uuidutils.generate_uuid()}]
self.manager.add_resource_association(ctx, constants.DUMMY,
self.manager.add_resource_association(ctx,
dp.DUMMY_SERVICE_TYPE,
**test_data[0])
self.manager.add_resource_association(ctx, constants.DUMMY,
self.manager.add_resource_association(ctx,
dp.DUMMY_SERVICE_TYPE,
**test_data[1])
self.manager.add_resource_association(ctx, constants.DUMMY,
self.manager.add_resource_association(ctx,
dp.DUMMY_SERVICE_TYPE,
**test_data[2])
names_by_id = self.manager.get_provider_names_by_resource_ids(
ctx, [td['resource_id'] for td in test_data])
@ -139,7 +144,7 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
def test_add_resource_association(self):
self._set_override([constants.LOADBALANCER +
':lbaas1:driver_path:default',
constants.DUMMY +
dp.DUMMY_SERVICE_TYPE +
':lbaas2:driver_path2'])
ctx = context.get_admin_context()
self.manager.add_resource_association(ctx,
@ -155,7 +160,7 @@ class ServiceTypeManagerTestCase(testlib_api.SqlTestCase):
def test_invalid_resource_association(self):
self._set_override([constants.LOADBALANCER +
':lbaas1:driver_path:default',
constants.DUMMY +
dp.DUMMY_SERVICE_TYPE +
':lbaas2:driver_path2'])
ctx = context.get_admin_context()
self.assertRaises(provconf.ServiceProviderNotFound,
@ -224,7 +229,7 @@ class ServiceTypeManagerExtTestCase(ServiceTypeExtensionTestCaseBase):
provconf.NeutronModule, 'service_providers').start()
service_providers = [
constants.LOADBALANCER + ':lbaas:driver_path',
constants.DUMMY + ':dummy:dummy_dr'
dp.DUMMY_SERVICE_TYPE + ':dummy:dummy_dr'
]
self.service_providers.return_value = service_providers
# Blank out service type manager instance

@ -14,6 +14,7 @@
# limitations under the License.
import mock
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
@ -22,7 +23,6 @@ from six import moves
import testtools
from testtools import matchers
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
TUNNEL_IP_ONE = "10.10.10.10"

@ -23,7 +23,6 @@ from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import exceptions
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst
@ -80,24 +79,22 @@ class TestLinuxBridge(base.BaseTestCase):
BRIDGE_MAPPINGS, INTERFACE_MAPPINGS)
def test_ensure_physical_in_bridge_invalid(self):
result = self.linux_bridge.ensure_physical_in_bridge('network_id',
p_const.TYPE_VLAN,
'physnetx',
7)
result = self.linux_bridge.ensure_physical_in_bridge(
'network_id', constants.TYPE_VLAN, 'physnetx', 7)
self.assertFalse(result)
def test_ensure_physical_in_bridge_flat(self):
with mock.patch.object(self.linux_bridge,
'ensure_flat_bridge') as flat_bridge_func:
self.linux_bridge.ensure_physical_in_bridge(
'network_id', p_const.TYPE_FLAT, 'physnet1', None)
'network_id', constants.TYPE_FLAT, 'physnet1', None)
self.assertTrue(flat_bridge_func.called)
def test_ensure_physical_in_bridge_vlan(self):
with mock.patch.object(self.linux_bridge,
'ensure_vlan_bridge') as vlan_bridge_func:
self.linux_bridge.ensure_physical_in_bridge(
'network_id', p_const.TYPE_VLAN, 'physnet1', 7)
'network_id', constants.TYPE_VLAN, 'physnet1', 7)
self.assertTrue(vlan_bridge_func.called)
def test_ensure_physical_in_bridge_vxlan(self):
@ -208,14 +205,14 @@ class TestLinuxBridgeManager(base.BaseTestCase):
self.lbm.get_tap_device_name(if_id))
def test_get_vxlan_device_name(self):
vn_id = p_const.MAX_VXLAN_VNI
vn_id = constants.MAX_VXLAN_VNI
self.assertEqual("vxlan-" + str(vn_id),
self.lbm.get_vxlan_device_name(vn_id))
self.assertIsNone(self.lbm.get_vxlan_device_name(vn_id + 1))
def test_get_vxlan_group(self):
cfg.CONF.set_override('vxlan_group', '239.1.2.3/24', 'VXLAN')
vn_id = p_const.MAX_VXLAN_VNI
vn_id = constants.MAX_VXLAN_VNI
self.assertEqual('239.1.2.255', self.lbm.get_vxlan_group(vn_id))
vn_id = 256
self.assertEqual('239.1.2.0', self.lbm.get_vxlan_group(vn_id))
@ -265,7 +262,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
cfg.CONF.set_override('local_ip', LOCAL_IPV6, 'VXLAN')
self.lbm.local_ip = LOCAL_IPV6
cfg.CONF.set_override('vxlan_group', VXLAN_GROUPV6, 'VXLAN')
vn_id = p_const.MAX_VXLAN_VNI
vn_id = constants.MAX_VXLAN_VNI
self.assertEqual('ff05::ff', self.lbm.get_vxlan_group(vn_id))
vn_id = 256
self.assertEqual('ff05::', self.lbm.get_vxlan_group(vn_id))
@ -500,18 +497,18 @@ class TestLinuxBridgeManager(base.BaseTestCase):
def test_ensure_physical_in_bridge(self):
self.assertFalse(
self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VLAN,
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_VLAN,
"phys", "1")
)
with mock.patch.object(self.lbm, "ensure_flat_bridge") as flbr_fn:
self.assertTrue(
self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_FLAT,
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_FLAT,
"physnet1", None)
)
self.assertTrue(flbr_fn.called)
with mock.patch.object(self.lbm, "ensure_vlan_bridge") as vlbr_fn:
self.assertTrue(
self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VLAN,
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_VLAN,
"physnet1", "1")
)
self.assertTrue(vlbr_fn.called)
@ -519,14 +516,14 @@ class TestLinuxBridgeManager(base.BaseTestCase):
with mock.patch.object(self.lbm, "ensure_vxlan_bridge") as vlbr_fn:
self.lbm.vxlan_mode = lconst.VXLAN_MCAST
self.assertTrue(
self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VXLAN,
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_VXLAN,
"physnet1", "1")
)
self.assertTrue(vlbr_fn.called)
def test_ensure_physical_in_bridge_with_existed_brq(self):
with mock.patch.object(linuxbridge_neutron_agent.LOG, 'error') as log:
self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_FLAT,
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_FLAT,
"physnet9", "1")
self.assertEqual(1, log.call_count)
@ -535,7 +532,7 @@ class TestLinuxBridgeManager(base.BaseTestCase):
with mock.patch.object(self.lbm, "_add_tap_interface",
side_effect=RuntimeError("No such dev")):
self.assertFalse(self.lbm.add_tap_interface("123",
p_const.TYPE_VLAN,
constants.TYPE_VLAN,
"physnet1", None,
"tap1", "foo", None))
@ -544,23 +541,21 @@ class TestLinuxBridgeManager(base.BaseTestCase):
with mock.patch.object(self.lbm, "_add_tap_interface",
side_effect=RuntimeError("No more fuel")):
self.assertRaises(RuntimeError, self.lbm.add_tap_interface, "123",
p_const.TYPE_VLAN, "physnet1", None, "tap1",
constants.TYPE_VLAN, "physnet1", None, "tap1",
"foo", None)
def test_add_tap_interface_owner_compute(self):
with mock.patch.object(ip_lib, "device_exists"):
with mock.patch.object(self.lbm, "ensure_local_bridge"):
self.assertTrue(self.lbm.add_tap_interface("123",
p_const.TYPE_LOCAL,
"physnet1", None,
"tap1",
"compute:1", None))
self.assertTrue(self.lbm.add_tap_interface(
"123", constants.TYPE_LOCAL, "physnet1",
None, "tap1", "compute:1", None))
def _test_add_tap_interface(self, dev_owner_prefix):
with mock.patch.object(ip_lib, "device_exists") as de_fn:
de_fn.return_value = False
self.assertFalse(
self.lbm.add_tap_interface("123", p_const.TYPE_VLAN,
self.lbm.add_tap_interface("123", constants.TYPE_VLAN,
"physnet1", "1", "tap1",
dev_owner_prefix, None))
@ -574,41 +569,29 @@ class TestLinuxBridgeManager(base.BaseTestCase):
"get_interface_bridge") as get_br:
bridge_device.addif.retun_value = False
get_br.return_value = True
self.assertTrue(self.lbm.add_tap_interface("123",
p_const.TYPE_LOCAL,
"physnet1", None,
"tap1",
dev_owner_prefix,
None))
self.assertTrue(self.lbm.add_tap_interface(
"123", constants.TYPE_LOCAL, "physnet1", None,
"tap1", dev_owner_prefix, None))
en_fn.assert_called_with("123", "brq123")
self.lbm.bridge_mappings = {"physnet1": "brq999"}
self.assertTrue(self.lbm.add_tap_interface("123",
p_const.TYPE_LOCAL,
"physnet1", None,
"tap1",
dev_owner_prefix,
8765))
self.assertTrue(self.lbm.add_tap_interface(
"123", constants.TYPE_LOCAL, "physnet1", None,
"tap1", dev_owner_prefix, 8765))
set_tap.assert_called_with('tap1', 8765)
en_fn.assert_called_with("123", "brq999")
get_br.return_value = False
bridge_device.addif.retun_value = True
self.assertFalse(self.lbm.add_tap_interface("123",
p_const.TYPE_LOCAL,
"physnet1", None,
"tap1",
dev_owner_prefix,
None))
self.assertFalse(self.lbm.add_tap_interface(
"123", constants.TYPE_LOCAL, "physnet1",
None, "tap1", dev_owner_prefix, None))
with mock.patch.object(self.lbm,
"ensure_physical_in_bridge") as ens_fn:
ens_fn.return_value = False
self.assertFalse(self.lbm.add_tap_interface("123",
p_const.TYPE_VLAN,
"physnet1", "1",
"tap1",
dev_owner_prefix,
None))
self.assertFalse(self.lbm.add_tap_interface(
"123", constants.TYPE_VLAN, "physnet1", "1",
"tap1", dev_owner_prefix, None))
def test_add_tap_interface_owner_network(self):
self._test_add_tap_interface(constants.DEVICE_OWNER_NETWORK_PREFIX)
@ -617,11 +600,12 @@ class TestLinuxBridgeManager(base.BaseTestCase):
self._test_add_tap_interface(constants.DEVICE_OWNER_NEUTRON_PREFIX)
def test_plug_interface(self):
segment = amb.NetworkSegment(p_const.TYPE_VLAN, "physnet-1", "1", 1777)
segment = amb.NetworkSegment(
constants.TYPE_VLAN, "physnet-1", "1", 1777)
with mock.patch.object(self.lbm, "add_tap_interface") as add_tap:
self.lbm.plug_interface("123", segment, "tap234",
constants.DEVICE_OWNER_NETWORK_PREFIX)
add_tap.assert_called_with("123", p_const.TYPE_VLAN, "physnet-1",
add_tap.assert_called_with("123", constants.TYPE_VLAN, "physnet-1",
"1", "tap234",
constants.DEVICE_OWNER_NETWORK_PREFIX,
1777)

@ -24,7 +24,6 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron.agent import securitygroups_rpc
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2.drivers import mech_agent
@ -56,8 +55,8 @@ class FakeAgentMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
def get_allowed_network_types(self, agent):
return (agent['configurations'].get('tunnel_types', []) +
[p_constants.TYPE_LOCAL, p_constants.TYPE_FLAT,
p_constants.TYPE_VLAN])
[constants.TYPE_LOCAL, constants.TYPE_FLAT,
constants.TYPE_VLAN])
def get_mappings(self, agent):
return dict(agent['configurations'].get('interface_mappings', {}))

@ -19,7 +19,6 @@ from neutron_lib import constants
from neutron_lib.plugins.ml2 import api
import testtools
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.mech_sriov.mech_driver \
import exceptions as exc
from neutron.plugins.ml2.drivers.mech_sriov.mech_driver import mech_driver
@ -75,10 +74,10 @@ class SriovSwitchMechGenericTestCase(SriovNicSwitchMechanismBaseTestCase,
def test_check_segment(self):
"""Validate the check_segment call."""
segment = {'api.NETWORK_TYPE': ""}
segment[api.NETWORK_TYPE] = p_const.TYPE_VLAN
segment[api.NETWORK_TYPE] = constants.TYPE_VLAN
self.assertTrue(self.driver.check_segment_for_agent(segment))
# Validate a network type not currently supported
segment[api.NETWORK_TYPE] = p_const.TYPE_GRE
segment[api.NETWORK_TYPE] = constants.TYPE_GRE
self.assertFalse(self.driver.check_segment_for_agent(segment))
def test_check_segment_allows_supported_network_types(self):
@ -169,7 +168,7 @@ class SriovSwitchMechVifDetailsTestCase(SriovNicSwitchMechanismBaseTestCase):
self.assertEqual(1234, vlan_id)
def test_get_vif_details_for_flat_network(self):
segment = {api.NETWORK_TYPE: p_const.TYPE_FLAT}
segment = {api.NETWORK_TYPE: constants.TYPE_FLAT}
vif_details = self.driver._get_vif_details(segment)
vlan_id = vif_details[portbindings.VIF_DETAILS_VLAN]
self.assertEqual('0', vlan_id)

@ -29,7 +29,6 @@ from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.common import constants as c_const
from neutron.common import rpc as n_rpc
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
@ -83,7 +82,7 @@ class ValidateTunnelTypes(ovs_test_base.OVSAgentConfigTestBase):
def test_validate_tunnel_types_succeeds(self):
cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE],
cfg.CONF.set_override('tunnel_types', [n_const.TYPE_GRE],
group='AGENT')
self.mod_agent.validate_tunnel_config(cfg.CONF.AGENT.tunnel_types,
cfg.CONF.OVS.local_ip)
@ -217,13 +216,13 @@ class TestOvsNeutronAgent(object):
self.agent.agent_state['agent_type'])
def test_agent_available_local_vlans(self):
expected = [p_const.MIN_VLAN_TAG,
p_const.MIN_VLAN_TAG + 1,
p_const.MAX_VLAN_TAG - 1,
p_const.MAX_VLAN_TAG]
exception = [p_const.MIN_VLAN_TAG - 1,
p_const.MAX_VLAN_TAG + 1,
p_const.MAX_VLAN_TAG + 2]
expected = [n_const.MIN_VLAN_TAG,
n_const.MIN_VLAN_TAG + 1,
n_const.MAX_VLAN_TAG - 1,
n_const.MAX_VLAN_TAG]
exception = [n_const.MIN_VLAN_TAG - 1,
n_const.MAX_VLAN_TAG + 1,
n_const.MAX_VLAN_TAG + 2]
available_vlan = self.agent.available_local_vlans
for tag in expected:
self.assertIn(tag, available_vlan)
@ -1652,14 +1651,14 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.mod_agent.LOG, 'error') as log_error_fn:
self.agent.local_ip = '1.2.3.4'
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', remote_ip, p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', remote_ip, n_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', remote_ip, self.agent.local_ip, p_const.TYPE_GRE,
'gre-1', remote_ip, self.agent.local_ip, n_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment,
self.agent.tunnel_csum)
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': remote_ip})
{'type': n_const.TYPE_GRE, 'ip': remote_ip})
self.assertEqual(0, ofport)
def test_setup_tunnel_port_invalid_address_mismatch(self):
@ -1667,7 +1666,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.mod_agent.LOG, 'error') as log_error_fn:
self.agent.local_ip = '1.2.3.4'
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', remote_ip, p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', remote_ip, n_const.TYPE_GRE)
log_error_fn.assert_called_once_with(
_("IP version mismatch, cannot create tunnel: "
"local_ip=%(lip)s remote_ip=%(rip)s"),
@ -1679,7 +1678,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.mod_agent.LOG, 'error') as log_error_fn:
self.agent.local_ip = '1.2.3.4.5'
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', remote_ip, p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', remote_ip, n_const.TYPE_GRE)
log_error_fn.assert_called_once_with(
_("Invalid local or remote IP, cannot create tunnel: "
"local_ip=%(lip)s remote_ip=%(rip)s"),
@ -1697,14 +1696,14 @@ class TestOvsNeutronAgent(object):
self.agent.tunnel_csum = False
self.agent.local_ip = '2.3.4.5'
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', remote_ip, p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', remote_ip, n_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', remote_ip, self.agent.local_ip, p_const.TYPE_GRE,
'gre-1', remote_ip, self.agent.local_ip, n_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment,
self.agent.tunnel_csum)
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': remote_ip})
{'type': n_const.TYPE_GRE, 'ip': remote_ip})
self.assertEqual(0, ofport)
def test_setup_tunnel_port_error_negative_tunnel_csum(self):
@ -1718,14 +1717,14 @@ class TestOvsNeutronAgent(object):
self.agent.tunnel_csum = True
self.agent.local_ip = '2.3.4.5'
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', remote_ip, p_const.TYPE_GRE)
self.agent.tun_br, 'gre-1', remote_ip, n_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', remote_ip, self.agent.local_ip, p_const.TYPE_GRE,
'gre-1', remote_ip, self.agent.local_ip, n_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment,
self.agent.tunnel_csum)
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': remote_ip})
{'type': n_const.TYPE_GRE, 'ip': remote_ip})
self.assertEqual(0, ofport)
def test_tunnel_sync_with_ml2_plugin(self):
@ -2418,7 +2417,7 @@ class TestOvsDvrNeutronAgent(object):
self._compute_port.vif_mac = '77:88:99:00:11:22'
physical_network = self._physical_network
segmentation_id = self._segmentation_id
network_type = p_const.TYPE_VLAN
network_type = n_const.TYPE_VLAN
int_br = mock.create_autospec(self.agent.int_br)
tun_br = mock.create_autospec(self.agent.tun_br)
phys_br = mock.create_autospec(self.br_phys_cls('br-phys'))
@ -2504,7 +2503,7 @@ class TestOvsDvrNeutronAgent(object):
else:
gateway_ip = '2001:100::1'
cidr = '2001:100::0/64'
network_type = p_const.TYPE_VXLAN
network_type = n_const.TYPE_VXLAN
self._port.vif_mac = gateway_mac = 'aa:bb:cc:11:22:33'
self._compute_port.vif_mac = '77:88:99:00:11:22'
physical_network = self._physical_network

@ -24,7 +24,6 @@ import six
from neutron.agent.common import ip_lib
from neutron.agent.common import ovs_lib
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
@ -53,7 +52,7 @@ FIXED_IPS = [{'subnet_id': 'my-subnet-uuid',
'ip_address': '1.1.1.1'}]
VM_DEVICE_OWNER = n_const.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
TUN_OFPORTS = {p_const.TYPE_GRE: {'ip1': '11', 'ip2': '12'}}
TUN_OFPORTS = {n_const.TYPE_GRE: {'ip1': '11', 'ip2': '12'}}
BCAST_MAC = "01:00:00:00:00:00/01:00:00:00:00:00"
UCAST_MAC = "00:00:00:00:00:00/01:00:00:00:00:00"
@ -353,11 +352,11 @@ class TunnelTest(object):
self._verify_mock_calls()
def test_provision_local_vlan(self):
ofports = list(TUN_OFPORTS[p_const.TYPE_GRE].values())
ofports = list(TUN_OFPORTS[n_const.TYPE_GRE].values())
self.mock_tun_bridge_expected += [
mock.call.install_flood_to_tun(LV_ID, LS_ID, ofports),
mock.call.provision_local_vlan(
network_type=p_const.TYPE_GRE,
network_type=n_const.TYPE_GRE,
lvid=LV_ID,
segmentation_id=LS_ID),
]
@ -365,7 +364,7 @@ class TunnelTest(object):
a = self._build_agent()
a.available_local_vlans = set([LV_ID])
a.tun_br_ofports = TUN_OFPORTS
a.provision_local_vlan(NET_UUID, p_const.TYPE_GRE, None, LS_ID)
a.provision_local_vlan(NET_UUID, n_const.TYPE_GRE, None, LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_flat(self):
@ -386,12 +385,12 @@ class TunnelTest(object):
a.phys_brs['net1'] = self.mock_map_tun_bridge
a.phys_ofports['net1'] = self.MAP_TUN_PHY_OFPORT
a.int_ofports['net1'] = self.INT_OFPORT
a.provision_local_vlan(NET_UUID, p_const.TYPE_FLAT, 'net1', LS_ID)
a.provision_local_vlan(NET_UUID, n_const.TYPE_FLAT, 'net1', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_flat_fail(self):
a = self._build_agent()
a.provision_local_vlan(NET_UUID, p_const.TYPE_FLAT, 'net2', LS_ID)
a.provision_local_vlan(NET_UUID, n_const.TYPE_FLAT, 'net2', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_vlan(self):
@ -411,12 +410,12 @@ class TunnelTest(object):
a.phys_brs['net1'] = self.mock_map_tun_bridge
a.phys_ofports['net1'] = self.MAP_TUN_PHY_OFPORT
a.int_ofports['net1'] = self.INT_OFPORT
a.provision_local_vlan(NET_UUID, p_const.TYPE_VLAN, 'net1', LS_ID)
a.provision_local_vlan(NET_UUID, n_const.TYPE_VLAN, 'net1', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_vlan_fail(self):
a = self._build_agent()
a.provision_local_vlan(NET_UUID, p_const.TYPE_VLAN, 'net2', LS_ID)
a.provision_local_vlan(NET_UUID, n_const.TYPE_VLAN, 'net2', LS_ID)
self._verify_mock_calls()
def test_reclaim_local_vlan(self):
@ -534,7 +533,7 @@ class TunnelTest(object):
a = self._build_agent()
a.tunnel_update(
mock.sentinel.ctx, tunnel_ip='10.0.10.1',
tunnel_type=p_const.TYPE_GRE)
tunnel_type=n_const.TYPE_GRE)
self._verify_mock_calls()
def test_tunnel_update_self(self):

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
@ -20,7 +21,6 @@ from oslo_config import cfg
from neutron.common import exceptions as n_exc
from neutron.objects.plugins.ml2 import flatallocation as flat_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_flat
from neutron.tests import base
from neutron.tests.unit import testlib_api

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.common import constants as p_const
from neutron_lib import constants as p_const
from neutron.plugins.ml2.drivers import type_geneve
from neutron.tests.unit.plugins.ml2.drivers import base_type_tunnel
from neutron.tests.unit.plugins.ml2 import test_rpc

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.common import constants as p_const
from neutron_lib import constants as p_const
from neutron.plugins.ml2.drivers import type_gre
from neutron.tests.unit.plugins.ml2.drivers import base_type_tunnel
from neutron.tests.unit.plugins.ml2 import test_rpc

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as p_const
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_local
from neutron.tests import base

@ -14,6 +14,7 @@
# under the License.
import mock
from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
@ -22,7 +23,6 @@ from testtools import matchers
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlan_alloc_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import type_vlan
from neutron.tests.unit import testlib_api

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.plugins.common import constants as p_const
from neutron_lib import constants as p_const
from neutron.plugins.ml2.drivers import type_vxlan
from neutron.tests.unit.plugins.ml2.drivers import base_type_tunnel
from neutron.tests.unit.plugins.ml2 import test_rpc

@ -48,7 +48,6 @@ from neutron.extensions import external_net
from neutron.extensions import multiprovidernet as mpnet
from neutron.objects import base as base_obj
from neutron.objects import router as l3_obj
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import driver_context
@ -392,7 +391,7 @@ class TestExternalNetwork(Ml2PluginV2TestCase):
network = self._create_external_network()
# For external network, expected network type to be
# tenant_network_types which is by default 'local'.
self.assertEqual(p_const.TYPE_LOCAL,
self.assertEqual(constants.TYPE_LOCAL,
network['network'][pnet.NETWORK_TYPE])
# No physical network specified, expected 'None'.
self.assertIsNone(network['network'][pnet.PHYSICAL_NETWORK])
@ -403,12 +402,12 @@ class TestExternalNetwork(Ml2PluginV2TestCase):
def test_external_network_type_vlan(self):
cfg.CONF.set_default('external_network_type',
p_const.TYPE_VLAN,
constants.TYPE_VLAN,
group='ml2')
network = self._create_external_network()
# For external network, expected network type to be 'vlan'.
self.assertEqual(p_const.TYPE_VLAN,
self.assertEqual(constants.TYPE_VLAN,
network['network'][pnet.NETWORK_TYPE])
# Physical network is expected.
self.assertIsNotNone(network['network'][pnet.PHYSICAL_NETWORK])

@ -16,11 +16,11 @@ import mock
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.plugins import constants as p_cons
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
import testtools
from neutron.plugins.common import constants as p_cons
from neutron.services.l3_router.service_providers import driver_controller
from neutron.services import provider_configuration
from neutron.tests import base

@ -14,6 +14,7 @@
import mock
from neutron_lib import context
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
@ -25,7 +26,6 @@ from neutron.db.metering import metering_rpc
from neutron.db.models import agent as agent_model
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
from neutron.plugins.common import constants
from neutron.tests.common import helpers
from neutron.tests import tools
from neutron.tests.unit.db.metering import test_metering_db

@ -13,6 +13,7 @@
import mock
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.plugins import constants as plugins_constants
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_utils import uuidutils
@ -23,7 +24,6 @@ from neutron import manager
from neutron.objects import base as base_object
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.plugins.common import constants as plugins_constants
from neutron.services.qos import qos_consts
from neutron.services.qos import qos_plugin
from neutron.tests.unit.services.qos import base

@ -17,10 +17,10 @@ import shutil
import mock
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants
from oslo_config import cfg
from neutron import manager
from neutron.plugins.common import constants
from neutron.services import provider_configuration as provconf
from neutron.tests import base

@ -31,7 +31,7 @@ DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class MultiServiceCorePlugin(object):
supported_extension_aliases = ['lbaas', 'dummy']
supported_extension_aliases = ['lbaas', dummy_plugin.Dummy.get_alias()]
class CorePluginWithAgentNotifiers(object):
@ -42,19 +42,29 @@ class CorePluginWithAgentNotifiers(object):
class NeutronManagerTestCase(base.BaseTestCase):
def setUp(self):
ext_mapping = constants.EXT_TO_SERVICE_MAPPING
if dummy_plugin.Dummy.get_alias() not in ext_mapping:
ext_mapping[dummy_plugin.Dummy.get_alias()] = (
dummy_plugin.DUMMY_SERVICE_TYPE)
super(NeutronManagerTestCase, self).setUp()
self.config_parse()
self.setup_coreplugin(load_plugins=False)
self.useFixture(
fixtures.MonkeyPatch('neutron.manager.NeutronManager._instance'))
def tearDown(self):
ext_mapping = constants.EXT_TO_SERVICE_MAPPING
if dummy_plugin.Dummy.get_alias() in ext_mapping:
del ext_mapping[dummy_plugin.Dummy.get_alias()]
super(NeutronManagerTestCase, self).tearDown()
def test_service_plugin_is_loaded(self):
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins",
["neutron.tests.unit.dummy_plugin."
"DummyServicePlugin"])
manager.init()
plugin = directory.get_plugin(constants.DUMMY)
plugin = directory.get_plugin(dummy_plugin.DUMMY_SERVICE_TYPE)
self.assertIsInstance(
plugin, dummy_plugin.DummyServicePlugin,
@ -62,9 +72,10 @@ class NeutronManagerTestCase(base.BaseTestCase):
def test_service_plugin_by_name_is_loaded(self):
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins", ["dummy"])
cfg.CONF.set_override("service_plugins",
[dummy_plugin.Dummy.get_alias()])
manager.init()
plugin = directory.get_plugin(constants.DUMMY)
plugin = directory.get_plugin(dummy_plugin.DUMMY_SERVICE_TYPE)
self.assertIsInstance(
plugin, dummy_plugin.DummyServicePlugin,
@ -78,17 +89,20 @@ class NeutronManagerTestCase(base.BaseTestCase):
"DummyServicePlugin"])
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
e = self.assertRaises(ValueError, manager.NeutronManager.get_instance)
self.assertIn(constants.DUMMY, str(e))
self.assertIn(dummy_plugin.DUMMY_SERVICE_TYPE, str(e))
def test_multiple_plugins_by_name_specified_for_service_type(self):
cfg.CONF.set_override("service_plugins", ["dummy", "dummy"])
cfg.CONF.set_override("service_plugins",
[dummy_plugin.Dummy.get_alias(),
dummy_plugin.Dummy.get_alias()])
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
self.assertRaises(ValueError, manager.NeutronManager.get_instance)
def test_multiple_plugins_mixed_specified_for_service_type(self):
cfg.CONF.set_override("service_plugins",
["neutron.tests.unit.dummy_plugin."
"DummyServicePlugin", "dummy"])
"DummyServicePlugin",
dummy_plugin.Dummy.get_alias()])
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
self.assertRaises(ValueError, manager.NeutronManager.get_instance)
@ -100,7 +114,7 @@ class NeutronManagerTestCase(base.BaseTestCase):
"neutron.tests.unit.test_manager."
"MultiServiceCorePlugin")
e = self.assertRaises(ValueError, manager.NeutronManager.get_instance)
self.assertIn(constants.DUMMY, str(e))
self.assertIn(dummy_plugin.DUMMY_SERVICE_TYPE, str(e))
def test_core_plugin_supports_services(self):
cfg.CONF.set_override("core_plugin",
@ -110,17 +124,18 @@ class NeutronManagerTestCase(base.BaseTestCase):
svc_plugins = directory.get_plugins()
self.assertEqual(3, len(svc_plugins))
self.assertIn(lib_const.CORE, svc_plugins.keys())
self.assertIn(constants.LOADBALANCER, svc_plugins.keys())
self.assertIn(constants.DUMMY, svc_plugins.keys())
self.assertIn(lib_const.LOADBALANCER, svc_plugins.keys())
self.assertIn(dummy_plugin.DUMMY_SERVICE_TYPE, svc_plugins.keys())
def test_load_default_service_plugins(self):
self.patched_default_svc_plugins.return_value = {
'neutron.tests.unit.dummy_plugin.DummyServicePlugin': 'DUMMY'
'neutron.tests.unit.dummy_plugin.DummyServicePlugin':
dummy_plugin.DUMMY_SERVICE_TYPE
}
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
manager.init()
svc_plugins = directory.get_plugins()
self.assertIn('DUMMY', svc_plugins)
self.assertIn(dummy_plugin.DUMMY_SERVICE_TYPE, svc_plugins)
def test_post_plugin_validation(self):
cfg.CONF.import_opt('dhcp_agents_per_network',
@ -148,7 +163,7 @@ class NeutronManagerTestCase(base.BaseTestCase):
"CorePluginWithAgentNotifiers")
expected = {'l3': 'l3_agent_notifier',
'dhcp': 'dhcp_agent_notifier',
'dummy': 'dummy_agent_notifier'}
dummy_plugin.Dummy.get_alias(): 'dummy_agent_notifier'}
manager.init()
core_plugin = directory.get_plugin()
self.assertEqual(expected, core_plugin.agent_notifiers)