remove neutron.common.constants

All of the externally consumed variables from neutron.common.constants
now live in neutron-lib. This patch removes neutron.common.constants
and switches all uses over to lib.

NeutronLibImpact

Depends-On: https://review.openstack.org/#/c/647836/
Change-Id: I3c2f28ecd18996a1cee1ae3af399166defe9da87
This commit is contained in:
Boden R 2019-03-29 13:27:57 -06:00
parent fb6094fe8d
commit 9bbe9911c4
99 changed files with 324 additions and 619 deletions

View File

@ -188,7 +188,7 @@ service plugins. Such as router and floatingip owner check for ``router``
service plugin. Developers can register the extension resource name and service
plugin name which were registered in neutron-lib into
``EXT_PARENT_RESOURCE_MAPPING`` which is located in
``neutron.common.constants``.
``neutron_lib.services.constants``.
The check, performed in the ``__call__`` method, works as follows:

View File

@ -34,7 +34,7 @@ from neutron._i18n import _
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.agent.ovsdb import impl_idl
from neutron.common import constants as common_constants
from neutron.common import _constants as common_constants
from neutron.common import utils as common_utils
from neutron.conf.agent import ovs_conf
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
@ -826,8 +826,8 @@ class OVSBridge(BaseOVS):
def update_ingress_bw_limit_for_port(self, port_name, max_kbps,
max_burst_kbps):
max_bw_in_bits = max_kbps * common_constants.SI_BASE
max_burst_in_bits = max_burst_kbps * common_constants.SI_BASE
max_bw_in_bits = max_kbps * p_const.SI_BASE
max_burst_in_bits = max_burst_kbps * p_const.SI_BASE
port_type = self._get_port_val(port_name, "type")
if port_type in constants.OVS_DPDK_PORT_TYPES:
self._update_ingress_bw_limit_for_dpdk_port(
@ -847,18 +847,18 @@ class OVSBridge(BaseOVS):
other_config = qos_res['other_config']
max_bw_in_bits = other_config.get('max-rate')
if max_bw_in_bits is not None:
qos_max_kbps = int(max_bw_in_bits) / common_constants.SI_BASE
qos_max_kbps = int(max_bw_in_bits) / p_const.SI_BASE
queue_res = self.find_queue(port_name, QOS_DEFAULT_QUEUE)
if queue_res:
other_config = queue_res['other_config']
max_bw_in_bits = other_config.get('max-rate')
if max_bw_in_bits is not None:
queue_max_kbps = int(max_bw_in_bits) / common_constants.SI_BASE
queue_max_kbps = int(max_bw_in_bits) / p_const.SI_BASE
max_burst_in_bits = other_config.get('burst')
if max_burst_in_bits is not None:
max_burst_kbit = (
int(max_burst_in_bits) / common_constants.SI_BASE)
int(max_burst_in_bits) / p_const.SI_BASE)
if qos_max_kbps == queue_max_kbps:
max_kbps = qos_max_kbps
@ -879,12 +879,12 @@ class OVSBridge(BaseOVS):
if max_bw_in_bytes is not None:
max_kbps = common_utils.bits_to_kilobits(
common_utils.bytes_to_bits(int(float(max_bw_in_bytes))),
common_constants.SI_BASE)
p_const.SI_BASE)
max_burst_in_bytes = other_config.get("cbs")
if max_burst_in_bytes is not None:
max_burst_kbit = common_utils.bits_to_kilobits(
common_utils.bytes_to_bits(int(float(max_burst_in_bytes))),
common_constants.SI_BASE)
p_const.SI_BASE)
return max_kbps, max_burst_kbit
def delete_ingress_bw_limit_for_port(self, port_name):

View File

@ -39,7 +39,6 @@ from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const
from neutron.common import utils
from neutron import manager
@ -714,7 +713,7 @@ class DhcpPluginApi(object):
self.host = host
target = oslo_messaging.Target(
topic=topic,
namespace=n_const.RPC_NAMESPACE_DHCP_PLUGIN,
namespace=constants.RPC_NAMESPACE_DHCP_PLUGIN,
version='1.0')
self.client = n_rpc.get_client(target)

View File

@ -54,7 +54,6 @@ from neutron.agent.linux import pd
from neutron.agent.linux import utils as linux_utils
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as l3_constants
from neutron.common import ipv6_utils
from neutron.common import utils
from neutron import manager
@ -593,7 +592,7 @@ class L3NATAgent(ha.AgentMixin,
# For HA routers check that DB state matches actual state
if router.get('ha') and not is_dvr_only_agent and is_ha_router:
self.check_ha_state_for_router(
router['id'], router.get(l3_constants.HA_ROUTER_STATE_KEY))
router['id'], router.get(lib_const.HA_ROUTER_STATE_KEY))
ri.router = router
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
self, router=ri)

View File

@ -29,7 +29,6 @@ from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_info
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import constants
from neutron.common import utils as common_utils
from neutron.ipam import utils as ipam_utils
@ -70,7 +69,7 @@ class FipNamespace(namespaces.Namespace):
use_ipv6=self.use_ipv6)
path = os.path.join(agent_conf.state_path, 'fip-linklocal-networks')
self.local_subnets = lla.LinkLocalAllocator(
path, constants.DVR_FIP_LL_CIDR)
path, lib_constants.DVR_FIP_LL_CIDR)
self.destroyed = False
self._stale_fips_checked = False

View File

@ -25,7 +25,6 @@ import six
from neutron.agent.l3 import dvr_fip_ns
from neutron.agent.l3 import dvr_router_base
from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const
from neutron.common import utils as common_utils
LOG = logging.getLogger(__name__)
@ -475,7 +474,8 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def get_floating_agent_gw_interface(self, ext_net_id):
"""Filter Floating Agent GW port for the external network."""
fip_ports = self.router.get(n_const.FLOATINGIP_AGENT_INTF_KEY, [])
fip_ports = self.router.get(
lib_constants.FLOATINGIP_AGENT_INTF_KEY, [])
return next(
(p for p in fip_ports if p['network_id'] == ext_net_id), None)

View File

@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_log import log as logging
from neutron.agent.l3 import router_info as router
from neutron.common import constants as l3_constants
LOG = logging.getLogger(__name__)
@ -31,7 +31,7 @@ class DvrRouterBase(router.RouterInfo):
self.snat_ports = self.get_snat_interfaces()
def get_snat_interfaces(self):
return self.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
return self.router.get(constants.SNAT_ROUTER_INTF_KEY, [])
def get_snat_port_for_internal_port(self, int_port, snat_ports=None):
"""Return the SNAT port for the given internal interface port."""

View File

@ -16,6 +16,8 @@
import collections
import netaddr
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from neutron_lib import rpc as n_rpc
from oslo_concurrency import lockutils
from oslo_log import log as logging
@ -25,9 +27,7 @@ from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants
from neutron_lib.agent import l3_extension
from neutron_lib import constants as lib_consts
LOG = logging.getLogger(__name__)
DEFAULT_PORT_FORWARDING_CHAIN = 'fip-pf'
@ -174,7 +174,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
device = ip_lib.IPDevice(interface_name, namespace=namespace)
is_distributed = ri.router.get('distributed')
ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY, None)
ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None)
fip_statuses = {}
for port_forwarding in port_forwardings:
# check if the port forwarding is managed in this agent from
@ -202,10 +202,10 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
fip_ip)
else:
ri._add_vip(fip_cidr, interface_name)
status = lib_consts.FLOATINGIP_STATUS_ACTIVE
status = constants.FLOATINGIP_STATUS_ACTIVE
except Exception:
# Any error will causes the fip status to be set 'ERROR'
status = lib_consts.FLOATINGIP_STATUS_ERROR
status = constants.FLOATINGIP_STATUS_ERROR
LOG.warning("Unable to configure floating IP %(fip_id)s "
"for port forwarding %(pf_id)s",
{'fip_id': port_forwarding.floatingip_id,
@ -218,7 +218,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
if status:
fip_statuses[port_forwarding.floatingip_id] = status
if ha_port and ha_port['status'] == lib_consts.PORT_STATUS_ACTIVE:
if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE:
ri.enable_keepalived()
for port_forwarding in port_forwardings:
@ -266,8 +266,8 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
is_distributed = ri.router.get('distributed')
agent_mode = ri.agent_conf.agent_mode
if (is_distributed and
agent_mode in [lib_consts.L3_AGENT_MODE_DVR_NO_EXTERNAL,
lib_consts.L3_AGENT_MODE_DVR]):
agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL,
constants.L3_AGENT_MODE_DVR]):
# just support centralized cases
return False
return True
@ -343,7 +343,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
def _sync_and_remove_fip(self, context, fip_id_cidrs, device, ri):
if not fip_id_cidrs:
return
ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY)
ha_port = ri.router.get(constants.HA_INTERFACE_KEY)
fip_ids = [item[0] for item in fip_id_cidrs]
pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING,
filter_kwargs={

View File

@ -16,12 +16,12 @@
import os
import eventlet
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import fileutils
import webob
from neutron.agent.linux import utils as agent_utils
from neutron.common import constants
from neutron.notifiers import batch_notifier
LOG = logging.getLogger(__name__)

View File

@ -27,7 +27,6 @@ from neutron.agent.l3 import router_info as router
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import keepalived
from neutron.common import constants as const
from neutron.common import utils as common_utils
from neutron.extensions import revisions
from neutron.extensions import timestamp
@ -294,11 +293,12 @@ class HaRouter(router.RouterInfo):
if self._should_delete_ipv6_lladdr(ipv6_lladdr):
self.driver.configure_ipv6_ra(self.ha_namespace, interface_name,
const.ACCEPT_RA_DISABLED)
n_consts.ACCEPT_RA_DISABLED)
device.addr.flush(n_consts.IP_VERSION_6)
else:
self.driver.configure_ipv6_ra(self.ha_namespace, interface_name,
const.ACCEPT_RA_WITHOUT_FORWARDING)
self.driver.configure_ipv6_ra(
self.ha_namespace, interface_name,
n_consts.ACCEPT_RA_WITHOUT_FORWARDING)
self._remove_vip(ipv6_lladdr)
self._add_vip(ipv6_lladdr, interface_name, scope='link')

View File

@ -25,7 +25,6 @@ from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ra
from neutron.common import constants as n_const
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.ipam import utils as ipam_utils
@ -705,13 +704,14 @@ class RouterInfo(object):
if not self.is_v6_gateway_set(gateway_ips):
# There is no IPv6 gw_ip, use RouterAdvt for default route.
self.driver.configure_ipv6_ra(
ns_name, interface_name, n_const.ACCEPT_RA_WITH_FORWARDING)
ns_name, interface_name,
lib_constants.ACCEPT_RA_WITH_FORWARDING)
else:
# Otherwise, disable it
disable_ra = True
if disable_ra:
self.driver.configure_ipv6_ra(ns_name, interface_name,
n_const.ACCEPT_RA_DISABLED)
lib_constants.ACCEPT_RA_DISABLED)
self.driver.configure_ipv6_forwarding(ns_name, interface_name, enabled)
# This will make sure the 'all' setting is the same as the interface,
# which is needed for forwarding to work. Don't disable once it's
@ -858,7 +858,7 @@ class RouterInfo(object):
'snat', '-m mark ! --mark %s/%s '
'-m conntrack --ctstate DNAT '
'-j SNAT --to-source %s'
% (ext_in_mark, n_const.ROUTER_MARK_MASK, ex_gw_ip))
% (ext_in_mark, lib_constants.ROUTER_MARK_MASK, ex_gw_ip))
return [dont_snat_traffic_to_internal_ports_if_not_to_floating_ip,
snat_internal_traffic_to_floating_ip]
@ -872,7 +872,7 @@ class RouterInfo(object):
mark = self.agent_conf.external_ingress_mark
mark_packets_entering_external_gateway_port = (
'mark', '-i %s -j MARK --set-xmark %s/%s' %
(interface_name, mark, n_const.ROUTER_MARK_MASK))
(interface_name, mark, lib_constants.ROUTER_MARK_MASK))
return [mark_packets_entering_external_gateway_port]
def _empty_snat_chains(self, iptables_manager):
@ -1035,7 +1035,7 @@ class RouterInfo(object):
'-j MARK --set-xmark %(value)s/%(mask)s' %
{'interface_name': INTERNAL_DEV_PREFIX + '+',
'value': self.agent_conf.metadata_access_mark,
'mask': n_const.ROUTER_MARK_MASK})
'mask': lib_constants.ROUTER_MARK_MASK})
self.iptables_manager.ipv4['mangle'].add_rule(
'PREROUTING', mark_metadata_for_internal_interfaces)

View File

@ -38,7 +38,6 @@ from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.cmd import runtime_checks as checks
from neutron.common import constants as n_const
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.ipam import utils as ipam_utils
@ -1517,7 +1516,7 @@ class DeviceManager(object):
root_namespace=True)
if ipv6_utils.is_enabled_and_bind_by_default():
self.driver.configure_ipv6_ra(network.namespace, 'default',
n_const.ACCEPT_RA_DISABLED)
constants.ACCEPT_RA_DISABLED)
if ip_lib.ensure_device_is_ready(interface_name,
namespace=network.namespace):

View File

@ -25,7 +25,6 @@ import six
from neutron.agent.common import ovs_lib
from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const
from neutron.common import utils
LOG = logging.getLogger(__name__)
@ -39,7 +38,7 @@ def _get_veth(name1, name2, namespace2):
@six.add_metaclass(abc.ABCMeta)
class LinuxInterfaceDriver(object):
DEV_NAME_LEN = n_const.LINUX_DEV_LEN
DEV_NAME_LEN = constants.LINUX_DEV_LEN
DEV_NAME_PREFIX = constants.TAP_DEVICE_PREFIX
def __init__(self, conf):

View File

@ -15,12 +15,12 @@ import re
import eventlet
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import utils as linux_utils
from neutron.common import constants as n_const
LOG = logging.getLogger(__name__)
CONTRACK_MGRS = {}
@ -186,7 +186,7 @@ class IpConntrackManager(object):
if match:
# strip off any prefix that the interface is using
short_port_id = (
match.group('dev')[n_const.LINUX_DEV_PREFIX_LEN:])
match.group('dev')[constants.LINUX_DEV_PREFIX_LEN:])
self._device_zone_map[short_port_id] = int(match.group('zone'))
LOG.debug("Populated conntrack zone map: %s", self._device_zone_map)
@ -196,11 +196,11 @@ class IpConntrackManager(object):
# map is populated strictly based on interface names that we don't know
# the full UUID of.
if self.zone_per_port:
identifier = port['device'][n_const.LINUX_DEV_PREFIX_LEN:]
identifier = port['device'][constants.LINUX_DEV_PREFIX_LEN:]
else:
identifier = port['network_id']
return identifier[:(n_const.LINUX_DEV_LEN -
n_const.LINUX_DEV_PREFIX_LEN)]
return identifier[:(constants.LINUX_DEV_LEN -
constants.LINUX_DEV_PREFIX_LEN)]
def get_device_zone(self, port, create=True):
device_key = self._device_key(port)

View File

@ -31,7 +31,6 @@ from neutron.agent.linux import ipset_manager
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import iptables_manager
from neutron.common import _constants as const
from neutron.common import constants as n_const
from neutron.common import ipv6_utils
from neutron.common import utils as c_utils
@ -49,7 +48,7 @@ libc = ctypes.CDLL(util.find_library('libc.so.6'))
def get_hybrid_port_name(port_name):
return (constants.TAP_DEVICE_PREFIX + port_name)[:n_const.LINUX_DEV_LEN]
return (constants.TAP_DEVICE_PREFIX + port_name)[:constants.LINUX_DEV_LEN]
class mac_iptables(netaddr.mac_eui48):
@ -356,7 +355,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
comment=ic.INPUT_TO_SG)
def _get_br_device_name(self, port):
return ('brq' + port['network_id'])[:n_const.LINUX_DEV_LEN]
return ('brq' + port['network_id'])[:constants.LINUX_DEV_LEN]
def _get_jump_rules(self, port, create=True):
zone = self.ipconntrack.get_device_zone(port, create=create)
@ -696,7 +695,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
def _protocol_name_map(self):
if not self._iptables_protocol_name_map:
tmp_map = n_const.IPTABLES_PROTOCOL_NAME_MAP.copy()
tmp_map = constants.IPTABLES_PROTOCOL_NAME_MAP.copy()
tmp_map.update(self._local_protocol_name_map())
self._iptables_protocol_name_map = tmp_map
return self._iptables_protocol_name_map
@ -948,7 +947,7 @@ class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
'%s%s' % (CHAIN_NAME_PREFIX[direction], port['device']))
def _get_br_device_name(self, port):
return ('qvb' + port['device'])[:n_const.LINUX_DEV_LEN]
return ('qvb' + port['device'])[:constants.LINUX_DEV_LEN]
def _get_device_name(self, port):
device_name = super(

View File

@ -25,6 +25,7 @@ import os
import re
import sys
from neutron_lib import constants
from neutron_lib import exceptions
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.utils import runtime
@ -37,7 +38,6 @@ from neutron._i18n import _
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import utils as linux_utils
from neutron.common import constants
from neutron.conf.agent import common as config
LOG = logging.getLogger(__name__)

View File

@ -17,6 +17,7 @@ import itertools
import os
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions
from neutron_lib.utils import file as file_utils
from oslo_config import cfg
@ -25,7 +26,6 @@ from oslo_utils import fileutils
from neutron._i18n import _
from neutron.agent.linux import external_process
from neutron.common import constants
from neutron.common import utils
VALID_STATES = ['MASTER', 'BACKUP']

View File

@ -15,7 +15,6 @@
from neutron_lib import constants
from neutron.common import constants as n_const
OF_STATE_NOT_TRACKED = "-trk"
OF_STATE_TRACKED = "+trk"
@ -47,6 +46,6 @@ REVERSE_IP_PROTOCOL_MAP_WITH_PORTS = {
PROTOCOLS_WITH_PORTS}
ethertype_to_dl_type_map = {
constants.IPv4: n_const.ETHERTYPE_IP,
constants.IPv6: n_const.ETHERTYPE_IPV6,
constants.IPv4: constants.ETHERTYPE_IP,
constants.IPv6: constants.ETHERTYPE_IPV6,
}

View File

@ -31,7 +31,6 @@ from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.agent.linux.openvswitch_firewall import exceptions
from neutron.agent.linux.openvswitch_firewall import iptables
from neutron.agent.linux.openvswitch_firewall import rules
from neutron.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
@ -732,7 +731,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
priority=95,
in_port=port.ofport,
reg_port=port.ofport,
dl_type=constants.ETHERTYPE_IPV6,
dl_type=lib_const.ETHERTYPE_IPV6,
nw_proto=lib_const.PROTO_NUM_IPV6_ICMP,
icmp_type=icmp_type,
actions='resubmit(,%d)' % (
@ -804,7 +803,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
in_port=port.ofport,
reg_port=port.ofport,
dl_src=mac_addr,
dl_type=constants.ETHERTYPE_ARP,
dl_type=lib_const.ETHERTYPE_ARP,
arp_spa=ip_addr,
actions='resubmit(,%d)' % (
ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE)
@ -813,7 +812,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
table=ovs_consts.BASE_EGRESS_TABLE,
priority=65,
reg_port=port.ofport,
dl_type=constants.ETHERTYPE_IP,
dl_type=lib_const.ETHERTYPE_IP,
in_port=port.ofport,
dl_src=mac_addr,
nw_src=ip_addr,
@ -831,7 +830,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
priority=65,
reg_port=port.ofport,
in_port=port.ofport,
dl_type=constants.ETHERTYPE_IPV6,
dl_type=lib_const.ETHERTYPE_IPV6,
dl_src=mac_addr,
ipv6_src=ip_addr,
actions='ct(table={:d},zone=NXM_NX_REG{:d}[0..15])'.format(
@ -841,8 +840,8 @@ class OVSFirewallDriver(firewall.FirewallDriver):
# DHCP discovery
for dl_type, src_port, dst_port in (
(constants.ETHERTYPE_IP, 68, 67),
(constants.ETHERTYPE_IPV6, 546, 547)):
(lib_const.ETHERTYPE_IP, 68, 67),
(lib_const.ETHERTYPE_IPV6, 546, 547)):
self._add_flow(
table=ovs_consts.BASE_EGRESS_TABLE,
priority=80,
@ -857,8 +856,8 @@ class OVSFirewallDriver(firewall.FirewallDriver):
)
# Ban dhcp service running on an instance
for dl_type, src_port, dst_port in (
(constants.ETHERTYPE_IP, 67, 68),
(constants.ETHERTYPE_IPV6, 547, 546)):
(lib_const.ETHERTYPE_IP, 67, 68),
(lib_const.ETHERTYPE_IPV6, 547, 546)):
self._add_flow(
table=ovs_consts.BASE_EGRESS_TABLE,
priority=70,
@ -877,7 +876,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
priority=70,
in_port=port.ofport,
reg_port=port.ofport,
dl_type=constants.ETHERTYPE_IPV6,
dl_type=lib_const.ETHERTYPE_IPV6,
nw_proto=lib_const.PROTO_NUM_IPV6_ICMP,
icmp_type=lib_const.ICMPV6_TYPE_RA,
actions='resubmit(,%d)' % ovs_consts.DROPPED_TRAFFIC_TABLE
@ -906,7 +905,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
ovsfw_consts.REG_PORT,
ovs_consts.BASE_INGRESS_TABLE),
)
for ethertype in [constants.ETHERTYPE_IP, constants.ETHERTYPE_IPV6]:
for ethertype in [lib_const.ETHERTYPE_IP, lib_const.ETHERTYPE_IPV6]:
self._add_flow(
table=ovs_consts.ACCEPT_OR_INGRESS_TABLE,
priority=90,
@ -964,7 +963,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
ct_state=ovsfw_consts.OF_STATE_NOT_ESTABLISHED,
actions='resubmit(,%d)' % ovs_consts.DROPPED_TRAFFIC_TABLE
)
for ethertype in [constants.ETHERTYPE_IP, constants.ETHERTYPE_IPV6]:
for ethertype in [lib_const.ETHERTYPE_IP, lib_const.ETHERTYPE_IPV6]:
self._add_flow(
table=ovs_consts.RULES_EGRESS_TABLE,
priority=40,
@ -983,7 +982,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
table=ovs_consts.BASE_INGRESS_TABLE,
priority=100,
reg_port=port.ofport,
dl_type=constants.ETHERTYPE_IPV6,
dl_type=lib_const.ETHERTYPE_IPV6,
nw_proto=lib_const.PROTO_NUM_IPV6_ICMP,
icmp_type=icmp_type,
actions='output:{:d}'.format(port.ofport)
@ -994,7 +993,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
self._add_flow(
table=ovs_consts.BASE_INGRESS_TABLE,
priority=100,
dl_type=constants.ETHERTYPE_ARP,
dl_type=lib_const.ETHERTYPE_ARP,
reg_port=port.ofport,
actions='output:{:d}'.format(port.ofport)
)
@ -1002,8 +1001,8 @@ class OVSFirewallDriver(firewall.FirewallDriver):
# DHCP offers
for dl_type, src_port, dst_port in (
(constants.ETHERTYPE_IP, 67, 68),
(constants.ETHERTYPE_IPV6, 547, 546)):
(lib_const.ETHERTYPE_IP, 67, 68),
(lib_const.ETHERTYPE_IPV6, 547, 546)):
self._add_flow(
table=ovs_consts.BASE_INGRESS_TABLE,
priority=95,
@ -1016,7 +1015,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
)
# Track untracked
for dl_type in (constants.ETHERTYPE_IP, constants.ETHERTYPE_IPV6):
for dl_type in (lib_const.ETHERTYPE_IP, lib_const.ETHERTYPE_IPV6):
self._add_flow(
table=ovs_consts.BASE_INGRESS_TABLE,
priority=90,
@ -1071,7 +1070,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
ct_state=ovsfw_consts.OF_STATE_NOT_ESTABLISHED,
actions='resubmit(,%d)' % ovs_consts.DROPPED_TRAFFIC_TABLE
)
for ethertype in [constants.ETHERTYPE_IP, constants.ETHERTYPE_IPV6]:
for ethertype in [lib_const.ETHERTYPE_IP, lib_const.ETHERTYPE_IPV6]:
self._add_flow(
table=ovs_consts.RULES_INGRESS_TABLE,
priority=40,

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants as n_const
from neutron_lib import constants as n_const
def get_device_port_name(port_id):

View File

@ -27,7 +27,6 @@ from oslo_utils import netutils
import six
from stevedore import driver
from neutron.common import constants as l3_constants
from neutron.common import utils
LOG = logging.getLogger(__name__)
@ -244,7 +243,7 @@ class PrefixDelegation(object):
gw_ifname,
ns_name,
lla_with_mask),
timeout=l3_constants.LLA_TASK_TIMEOUT,
timeout=n_const.LLA_TASK_TIMEOUT,
sleep=2)
def _lla_available(self, gw_ifname, ns_name, lla_with_mask):

View File

@ -27,7 +27,6 @@ import six
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import constants as n_const
RADVD_SERVICE_NAME = 'radvd'
RADVD_SERVICE_CMD = 'radvd'
@ -43,7 +42,7 @@ CONFIG_TEMPLATE = jinja2.Template("""interface {{ interface_name }}
MinRtrAdvInterval {{ min_rtr_adv_interval }};
MaxRtrAdvInterval {{ max_rtr_adv_interval }};
{% if network_mtu >= n_const.IPV6_MIN_MTU %}
{% if network_mtu >= constants.IPV6_MIN_MTU %}
AdvLinkMTU {{network_mtu}};
{% endif %}
@ -128,7 +127,7 @@ class DaemonMonitor(object):
auto_config_prefixes=auto_config_prefixes,
stateful_config_prefixes=stateful_config_prefixes,
dns_servers=dns_servers[0:MAX_RDNSS_ENTRIES],
n_const=n_const,
n_const=constants,
constants=constants,
min_rtr_adv_interval=self._agent_conf.min_rtr_adv_interval,
max_rtr_adv_interval=self._agent_conf.max_rtr_adv_interval,

View File

@ -33,7 +33,6 @@ from neutron._i18n import _
from neutron.agent.linux import utils as agent_utils
from neutron.agent import rpc as agent_rpc
from neutron.common import cache_utils as cache
from neutron.common import constants as n_const
from neutron.common import ipv6_utils
from neutron.conf.agent.metadata import config
@ -63,7 +62,7 @@ class MetadataPluginAPI(object):
def __init__(self, topic):
target = oslo_messaging.Target(
topic=topic,
namespace=n_const.RPC_NAMESPACE_METADATA,
namespace=constants.RPC_NAMESPACE_METADATA,
version='1.0')
self.client = n_rpc.get_client(target)

View File

@ -21,6 +21,7 @@ import pwd
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
@ -29,7 +30,6 @@ from neutron._i18n import _
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import namespaces
from neutron.agent.linux import external_process
from neutron.common import constants
LOG = logging.getLogger(__name__)

View File

@ -31,7 +31,7 @@ from oslo_utils import uuidutils
from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import _constants as n_const
from neutron import objects
LOG = logging.getLogger(__name__)
@ -78,7 +78,7 @@ class PluginReportStateAPI(object):
"""
def __init__(self, topic):
target = oslo_messaging.Target(topic=topic, version='1.2',
namespace=n_const.RPC_NAMESPACE_STATE)
namespace=constants.RPC_NAMESPACE_STATE)
self.client = lib_rpc.get_client(target)
def has_alive_neutron_server(self, context, **kwargs):
@ -321,7 +321,7 @@ class CacheBackedPluginApi(PluginApi):
LOG.debug("Device %s has no active binding in this host",
port_obj)
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
constants.NO_ACTIVE_BINDING: True}
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id

View File

@ -22,7 +22,7 @@ from oslo_log import log as logging
import oslo_messaging
from neutron.agent import firewall
from neutron.common import constants as common_constants
from neutron.common import _constants as common_constants
from neutron.conf.agent import securitygroups_rpc as sc_cfg

View File

@ -16,6 +16,7 @@
import functools
from neutron_lib.api import attributes
from neutron_lib import constants
from neutron_lib.db import model_base
from neutron_lib import exceptions
from oslo_config import cfg
@ -27,7 +28,6 @@ from webob import exc
from neutron._i18n import _
from neutron.api import extensions
from neutron.common import constants
from neutron import wsgi

View File

@ -32,7 +32,6 @@ import oslo_messaging
from oslo_utils import excutils
from neutron._i18n import _
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db import provisioning_blocks
from neutron.extensions import segment as segment_ext
@ -73,7 +72,7 @@ class DhcpRpcCallback(object):
# the major version as above applies here too.
target = oslo_messaging.Target(
namespace=n_const.RPC_NAMESPACE_DHCP_PLUGIN,
namespace=constants.RPC_NAMESPACE_DHCP_PLUGIN,
version='1.6')
def _get_active_networks(self, context, **kwargs):

View File

@ -14,13 +14,13 @@
# under the License.
from neutron_lib.agent import topics
from neutron_lib import constants
from neutron_lib.plugins import directory
from neutron_lib import rpc as n_rpc
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from neutron.common import constants
LOG = logging.getLogger(__name__)

View File

@ -26,8 +26,6 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from neutron.common import constants as n_const
LOG = logging.getLogger(__name__)
@ -150,7 +148,7 @@ class L3RpcCallback(object):
gw_port_host,
router.get('gw_port'),
router['id'])
for p in router.get(n_const.SNAT_ROUTER_INTF_KEY, []):
for p in router.get(constants.SNAT_ROUTER_INTF_KEY, []):
self._ensure_host_set_on_port(
context, gw_port_host, p, router['id'],
ha_router_port=router.get('ha'))

View File

@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib import constants
from neutron_lib.plugins import directory
import oslo_messaging
from neutron.common import constants
class MetadataRpcCallback(object):
"""Metadata agent RPC callback in plugin implementations.

View File

@ -16,6 +16,7 @@
import collections
from neutron_lib.agent import topics
from neutron_lib import constants
from neutron_lib import exceptions
from neutron_lib import rpc as n_rpc
from oslo_log import helpers as log_helpers
@ -28,7 +29,6 @@ from neutron.api.rpc.callbacks import exceptions as rpc_exc
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import version_manager
from neutron.common import constants
from neutron.objects import base as obj_base
LOG = logging.getLogger(__name__)

View File

@ -17,6 +17,7 @@ import collections
from neutron_lib.agent import topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import constants
from neutron_lib.plugins import directory
from neutron_lib import rpc as n_rpc
from neutron_lib.utils import net
@ -24,7 +25,6 @@ from oslo_log import log as logging
import oslo_messaging
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants
from neutron.db import securitygroups_rpc_base as sg_rpc_base
LOG = logging.getLogger(__name__)

View File

@ -20,6 +20,7 @@ from neutron_lib.api import attributes
from neutron_lib.api import faults
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions
from neutron_lib import rpc as n_rpc
@ -32,7 +33,6 @@ import webob.exc
from neutron._i18n import _
from neutron.api import api_common
from neutron.api.v2 import resource as wsgi_resource
from neutron.common import constants as n_const
from neutron import policy
from neutron import quota
from neutron.quota import resource_registry
@ -652,7 +652,7 @@ class Controller(object):
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[n_const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
orig_obj[constants.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
# Then get the ext_parent_id, format to ext_parent_parent_resource_id
if self._parent_id_name in orig_obj:
self._set_parent_id_into_ext_resources_request(
@ -798,13 +798,13 @@ class Controller(object):
self.parent['member_name'] in
service_const.EXT_PARENT_RESOURCE_MAPPING):
resource_item.setdefault(
"%s_%s" % (n_const.EXT_PARENT_PREFIX,
"%s_%s" % (constants.EXT_PARENT_PREFIX,
self._parent_id_name),
parent_id)
# If this func is called by create/update/delete, we just add.
else:
resource_item.setdefault(
"%s_%s" % (n_const.EXT_PARENT_PREFIX, self._parent_id_name),
"%s_%s" % (constants.EXT_PARENT_PREFIX, self._parent_id_name),
parent_id)

View File

@ -32,7 +32,6 @@ from neutron.agent.linux import ip_link_support
from neutron.agent.linux import keepalived
from neutron.agent.linux import utils as agent_utils
from neutron.cmd import runtime_checks
from neutron.common import constants
from neutron.common import utils as common_utils
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as ovs_const
@ -137,7 +136,7 @@ def icmpv6_header_match_supported():
return ofctl_arg_supported(cmd='add-flow',
table=ovs_const.ARP_SPOOF_TABLE,
priority=1,
dl_type=constants.ETHERTYPE_IPV6,
dl_type=n_consts.ETHERTYPE_IPV6,
nw_proto=n_consts.PROTO_NUM_IPV6_ICMP,
icmp_type=n_consts.ICMPV6_TYPE_NA,
nd_target='fdf8:f53b:82e4::10',

View File

@ -40,3 +40,19 @@ SG_PORT_PROTO_NAMES = [
IPTABLES_MULTIPORT_ONLY_PROTOCOLS = [
constants.PROTO_NAME_UDPLITE
]
# Number of resources for neutron agent side functions to deal
# with large sets.
# Setting this value does not count on special conditions, it is just a human
# countable or scalable number. [1] gives us the method to test the scale
# issue. And we have tested the value of 1000, 500, 200, 100. But for 100,
# ovs-agent will have a lower timeout probability. And according to the
# testing result, step size 100 can indeed cost about 10% much more time
# than 500/1000. But such extra time looks inevitably needed to be sacrificed
# for the restart success rate.
# [1] http://paste.openstack.org/show/745685/
AGENT_RES_PROCESSING_STEP = 100
# Number of resources for neutron to divide the large RPC
# call data sets.
RPC_RES_PROCESSING_STEP = 20

View File

@ -1,257 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib import constants as lib_constants
ROUTER_PORT_OWNERS = lib_constants.ROUTER_INTERFACE_OWNERS_SNAT + \
(lib_constants.DEVICE_OWNER_ROUTER_GW,)
ROUTER_STATUS_ACTIVE = 'ACTIVE'
ROUTER_STATUS_ALLOCATING = 'ALLOCATING'
ROUTER_STATUS_ERROR = 'ERROR'
VALID_ROUTER_STATUS = (ROUTER_STATUS_ACTIVE,
ROUTER_STATUS_ALLOCATING,
ROUTER_STATUS_ERROR)
HA_ROUTER_STATE_KEY = '_ha_state'
METERING_LABEL_KEY = '_metering_labels'
FLOATINGIP_AGENT_INTF_KEY = '_floatingip_agent_interfaces'
SNAT_ROUTER_INTF_KEY = '_snat_router_interfaces'
HA_NETWORK_NAME = 'HA network tenant %s'
HA_SUBNET_NAME = 'HA subnet tenant %s'
HA_PORT_NAME = 'HA port tenant %s'
HA_ROUTER_STATE_ACTIVE = 'active'
HA_ROUTER_STATE_STANDBY = 'standby'
HA_ROUTER_STATE_UNKNOWN = 'unknown'
VALID_HA_STATES = (HA_ROUTER_STATE_ACTIVE, HA_ROUTER_STATE_STANDBY,
HA_ROUTER_STATE_UNKNOWN)
PAGINATION_INFINITE = 'infinite'
SORT_DIRECTION_ASC = 'asc'
SORT_DIRECTION_DESC = 'desc'
ETHERTYPE_NAME_ARP = 'arp'
ETHERTYPE_ARP = 0x0806
ETHERTYPE_IP = 0x0800
ETHERTYPE_IPV6 = 0x86DD
IP_PROTOCOL_NAME_ALIASES = {lib_constants.PROTO_NAME_IPV6_ICMP_LEGACY:
lib_constants.PROTO_NAME_IPV6_ICMP}
IP_PROTOCOL_NUM_TO_NAME_MAP = {
str(v): k for k, v in lib_constants.IP_PROTOCOL_MAP.items()}
# When using iptables-save we specify '-p {proto}',
# but sometimes those values are not identical. This is a map
# of known protocol numbers that require a name to be used and
# protocol names that require a different name to be used,
# because that is how iptables-save will display them.
#
# This is how the list was created, so there is a possibility
# it will need to be updated in the future:
#
# $ for num in {0..255}; do iptables -A INPUT -p $num; done
# $ iptables-save
#
# These cases are special, and were found by inspection:
# - 'ipv6-encap' uses 'ipv6'
# - 'icmpv6' uses 'ipv6-icmp'
# - 'pgm' uses '113' instead of its name
# - protocol '0' uses no -p argument
IPTABLES_PROTOCOL_NAME_MAP = {lib_constants.PROTO_NAME_IPV6_ENCAP: 'ipv6',
lib_constants.PROTO_NAME_IPV6_ICMP_LEGACY:
'ipv6-icmp',
lib_constants.PROTO_NAME_PGM: '113',
'0': None,
'1': 'icmp',
'2': 'igmp',
'3': 'ggp',
'4': 'ipencap',
'5': 'st',
'6': 'tcp',
'8': 'egp',
'9': 'igp',
'12': 'pup',
'17': 'udp',
'20': 'hmp',
'22': 'xns-idp',
'27': 'rdp',
'29': 'iso-tp4',
'33': 'dccp',
'36': 'xtp',
'37': 'ddp',
'38': 'idpr-cmtp',
'41': 'ipv6',
'43': 'ipv6-route',
'44': 'ipv6-frag',
'45': 'idrp',
'46': 'rsvp',
'47': 'gre',
'50': 'esp',
'51': 'ah',
'57': 'skip',
'58': 'ipv6-icmp',
'59': 'ipv6-nonxt',
'60': 'ipv6-opts',
'73': 'rspf',
'81': 'vmtp',
'88': 'eigrp',
'89': 'ospf',
'93': 'ax.25',
'94': 'ipip',
'97': 'etherip',
'98': 'encap',
'103': 'pim',
'108': 'ipcomp',
'112': 'vrrp',
'115': 'l2tp',
'124': 'isis',
'132': 'sctp',
'133': 'fc',
'135': 'mobility-header',
'136': 'udplite',
'137': 'mpls-in-ip',
'138': 'manet',
'139': 'hip',
'140': 'shim6',
'141': 'wesp',
'142': 'rohc'}
# A length of a iptables chain name must be less than or equal to 11
# characters.
# <max length of iptables chain name> - (<binary_name> + '-') = 28-(16+1) = 11
MAX_IPTABLES_CHAIN_LEN_WRAP = 11
MAX_IPTABLES_CHAIN_LEN_NOWRAP = 28
# Timeout in seconds for getting an IPv6 LLA
LLA_TASK_TIMEOUT = 40
# length of all device prefixes (e.g. qvo, tap, qvb)
LINUX_DEV_PREFIX_LEN = 3
# must be shorter than linux IFNAMSIZ (which is 16)
LINUX_DEV_LEN = 14
# Possible prefixes to partial port IDs in interface names used by the OVS,
# Linux Bridge, and IVS VIF drivers in Nova and the neutron agents. See the
# 'get_ovs_interfaceid' method in Nova (nova/virt/libvirt/vif.py) for details.
INTERFACE_PREFIXES = (lib_constants.TAP_DEVICE_PREFIX,
lib_constants.VETH_DEVICE_PREFIX,
lib_constants.SNAT_INT_DEV_PREFIX)
ATTRIBUTES_TO_UPDATE = 'attributes_to_update'
# TODO(amuller): Re-define the RPC namespaces once Oslo messaging supports
# Targets with multiple namespaces. Neutron will then implement callbacks
# for its RPC clients in order to support rolling upgrades.
# RPC Interface for agents to call DHCP API implemented on the plugin side
RPC_NAMESPACE_DHCP_PLUGIN = None
# RPC interface for the metadata service to get info from the plugin side
RPC_NAMESPACE_METADATA = None
# RPC interface for agent to plugin security group API
RPC_NAMESPACE_SECGROUP = None
# RPC interface for agent to plugin DVR api
RPC_NAMESPACE_DVR = None
# RPC interface for reporting state back to the plugin
RPC_NAMESPACE_STATE = None
# RPC interface for agent to plugin resources API
RPC_NAMESPACE_RESOURCES = None
# Default network MTU value when not configured
DEFAULT_NETWORK_MTU = 1500
IPV6_MIN_MTU = 1280
ROUTER_MARK_MASK = "0xffff"
VALID_ETHERTYPES = (lib_constants.IPv4, lib_constants.IPv6)
IP_ALLOWED_VERSIONS = [lib_constants.IP_VERSION_4, lib_constants.IP_VERSION_6]
PORT_RANGE_MIN = 1
PORT_RANGE_MAX = 65535
# Configuration values for accept_ra sysctl, copied from linux kernel
# networking (netdev) tree, file Documentation/networking/ip-sysctl.txt
#
# Possible values are:
# 0 Do not accept Router Advertisements.
# 1 Accept Router Advertisements if forwarding is disabled.
# 2 Overrule forwarding behaviour. Accept Router Advertisements
# even if forwarding is enabled.
ACCEPT_RA_DISABLED = 0
ACCEPT_RA_WITHOUT_FORWARDING = 1
ACCEPT_RA_WITH_FORWARDING = 2
# Some components communicate using private address ranges, define
# them all here. These address ranges should not cause any issues
# even if they overlap since they are used in disjoint namespaces,
# but for now they are unique.
# We define the metadata cidr since it falls in the range.
PRIVATE_CIDR_RANGE = '169.254.0.0/16'
DVR_FIP_LL_CIDR = '169.254.64.0/18'
L3_HA_NET_CIDR = '169.254.192.0/18'
METADATA_CIDR = '169.254.169.254/32'
# The only defined IpamAllocation status at this stage is 'ALLOCATED'.
# More states will be available in the future - e.g.: RECYCLABLE
IPAM_ALLOCATION_STATUS_ALLOCATED = 'ALLOCATED'
VALID_IPAM_ALLOCATION_STATUSES = (IPAM_ALLOCATION_STATUS_ALLOCATED,)
# Port binding states for Live Migration
PORT_BINDING_STATUSES = (lib_constants.ACTIVE,
lib_constants.INACTIVE)
VALID_FLOATINGIP_STATUS = (lib_constants.FLOATINGIP_STATUS_ACTIVE,
lib_constants.FLOATINGIP_STATUS_DOWN,
lib_constants.FLOATINGIP_STATUS_ERROR)
# Floating IP host binding states
FLOATING_IP_HOST_UNBOUND = "FLOATING_IP_HOST_UNBOUND"
FLOATING_IP_HOST_NEEDS_BINDING = "FLOATING_IP_HOST_NEEDS_BINDING"
# Possible types of values (e.g. in QoS rule types)
VALUES_TYPE_CHOICES = "choices"
VALUES_TYPE_RANGE = "range"
# Units base
SI_BASE = 1000
IEC_BASE = 1024
# Port bindings handling
NO_ACTIVE_BINDING = 'no_active_binding'
EXT_PARENT_PREFIX = 'ext_parent'
RP_BANDWIDTHS = 'resource_provider_bandwidths'
RP_INVENTORY_DEFAULTS = 'resource_provider_inventory_defaults'
# Number of resources for neutron agent side functions to deal
# with large sets.
# Setting this value does not count on special conditions, it is just a human
# countable or scalable number. [1] gives us the method to test the scale
# issue. And we have tested the value of 1000, 500, 200, 100. But for 100,
# ovs-agent will have a lower timeout probability. And according to the
# testing result, step size 100 can indeed cost about 10% much more time
# than 500/1000. But such extra time looks inevitably needed to be sacrificed
# for the restart success rate.
# [1] http://paste.openstack.org/show/745685/
AGENT_RES_PROCESSING_STEP = 100
# Number of resources for neutron to divide the large RPC
# call data sets.
RPC_RES_PROCESSING_STEP = 20

View File

@ -13,12 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from neutron_lib.utils import net
from oslo_config import cfg
from oslo_service import wsgi
from neutron._i18n import _
from neutron.common import constants
core_opts = [

View File

@ -11,10 +11,10 @@
# under the License.
from neutron_lib import constants as n_const
from oslo_config import cfg
from neutron._i18n import _
from neutron.common import constants as n_const
L3_HA_OPTS = [

View File

@ -42,7 +42,6 @@ import six
from neutron.agent.common import utils
from neutron.api.rpc.callbacks import version_manager
from neutron.common import constants as n_const
from neutron.conf.agent.database import agents_db
from neutron.db.models import agent as agent_model
from neutron.extensions import _availability_zone_filter_lib as azfil_ext
@ -477,7 +476,7 @@ class AgentExtRpcCallback(object):
"""
target = oslo_messaging.Target(version='1.2',
namespace=n_const.RPC_NAMESPACE_STATE)
namespace=constants.RPC_NAMESPACE_STATE)
START_TIME = timeutils.utcnow()
def __init__(self, plugin=None):

View File

@ -33,7 +33,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc
from neutron.common import constants as n_const
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.objects import base as base_obj
@ -296,7 +295,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'name': network['name'],
'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'],
'mtu': network.get('mtu', n_const.DEFAULT_NETWORK_MTU),
'mtu': network.get('mtu', constants.DEFAULT_NETWORK_MTU),
'status': network['status'],
'subnets': [subnet['id']
for subnet in network['subnets']]}

View File

@ -37,7 +37,6 @@ from oslo_utils import excutils
import six
from neutron._i18n import _
from neutron.common import constants as l3_const
from neutron.common import utils as n_utils
from neutron.conf.db import l3_dvr_db
from neutron.db import l3_attrs_db
@ -738,12 +737,12 @@ class _DVRAgentInterfaceMixin(object):
if router['gw_port_id']:
snat_router_intfs = snat_intfs_by_router_id[router['id']]
LOG.debug("SNAT ports returned: %s ", snat_router_intfs)
router[l3_const.SNAT_ROUTER_INTF_KEY] = snat_router_intfs
router[const.SNAT_ROUTER_INTF_KEY] = snat_router_intfs
if not fip_agent_gw_ports:
fip_agent_gw_ports = self._get_fip_agent_gw_ports(
context, agent.id)
LOG.debug("FIP Agent ports: %s", fip_agent_gw_ports)
router[l3_const.FLOATINGIP_AGENT_INTF_KEY] = (
router[const.FLOATINGIP_AGENT_INTF_KEY] = (
fip_agent_gw_ports)
return routers_dict
@ -755,7 +754,7 @@ class _DVRAgentInterfaceMixin(object):
By default, floating IPs are not bound to any host. Instead
of using the empty string to denote this use a constant.
"""
return floating_ip.get('host', l3_const.FLOATING_IP_HOST_UNBOUND)
return floating_ip.get('host', const.FLOATING_IP_HOST_UNBOUND)
def _process_floating_ips_dvr(self, context, routers_dict,
floating_ips, host, agent):
@ -777,7 +776,7 @@ class _DVRAgentInterfaceMixin(object):
fip_host = self._get_floating_ip_host(floating_ip)
# Skip if it is unbound
if fip_host == l3_const.FLOATING_IP_HOST_UNBOUND:
if fip_host == const.FLOATING_IP_HOST_UNBOUND:
return True
# Skip if the given agent is in the wrong mode - SNAT bound
@ -790,7 +789,7 @@ class _DVRAgentInterfaceMixin(object):
# Skip if it is bound, but not to the given host
fip_dest_host = floating_ip.get('dest_host')
if (fip_host != l3_const.FLOATING_IP_HOST_NEEDS_BINDING and
if (fip_host != const.FLOATING_IP_HOST_NEEDS_BINDING and
fip_host != host and fip_dest_host is None):
return True
@ -879,11 +878,11 @@ class _DVRAgentInterfaceMixin(object):
# Add the port binding host to the floatingip dictionary
for fip in floating_ips:
# Assume no host binding required
fip['host'] = l3_const.FLOATING_IP_HOST_UNBOUND
fip['host'] = const.FLOATING_IP_HOST_UNBOUND
vm_port = port_dict.get(fip['port_id'], None)
if vm_port:
# Default value if host port-binding required
fip['host'] = l3_const.FLOATING_IP_HOST_NEEDS_BINDING
fip['host'] = const.FLOATING_IP_HOST_NEEDS_BINDING
port_host = vm_port[portbindings.HOST_ID]
if port_host:
fip['dest_host'] = (
@ -900,7 +899,7 @@ class _DVRAgentInterfaceMixin(object):
# Handle the case were there is no host binding
# for the private ports that are associated with
# floating ip.
if fip['host'] == l3_const.FLOATING_IP_HOST_NEEDS_BINDING:
if fip['host'] == const.FLOATING_IP_HOST_NEEDS_BINDING:
fip[const.DVR_SNAT_BOUND] = True
routers_dict = self._process_routers(context, routers, agent)
self._process_floating_ips_dvr(context, routers_dict,
@ -909,10 +908,10 @@ class _DVRAgentInterfaceMixin(object):
for router in routers_dict.values():
if router.get('gw_port'):
ports_to_populate.append(router['gw_port'])
if router.get(l3_const.FLOATINGIP_AGENT_INTF_KEY):
ports_to_populate += router[l3_const.FLOATINGIP_AGENT_INTF_KEY]
if router.get(l3_const.SNAT_ROUTER_INTF_KEY):
ports_to_populate += router[l3_const.SNAT_ROUTER_INTF_KEY]
if router.get(const.FLOATINGIP_AGENT_INTF_KEY):
ports_to_populate += router[const.FLOATINGIP_AGENT_INTF_KEY]
if router.get(const.SNAT_ROUTER_INTF_KEY):
ports_to_populate += router[const.SNAT_ROUTER_INTF_KEY]
ports_to_populate += interfaces
self._populate_mtu_and_subnets_for_ports(context, ports_to_populate)
self._process_interfaces(routers_dict, interfaces)

View File

@ -45,7 +45,6 @@ from sqlalchemy import exc as sql_exc
from sqlalchemy import orm
from neutron._i18n import _
from neutron.common import constants as n_const
from neutron.common import utils as n_utils
from neutron.conf.db import l3_hamode_db
from neutron.db import _utils as db_utils
@ -173,7 +172,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
def _create_ha_subnet(self, context, network_id, tenant_id):
args = {'network_id': network_id,
'tenant_id': '',
'name': n_const.HA_SUBNET_NAME % tenant_id,
'name': constants.HA_SUBNET_NAME % tenant_id,
'ip_version': 4,
'cidr': cfg.CONF.l3_ha_net_cidr,
'enable_dhcp': False,
@ -209,7 +208,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
admin_ctx = context.elevated()
args = {'network':
{'name': n_const.HA_NETWORK_NAME % tenant_id,
{'name': constants.HA_NETWORK_NAME % tenant_id,
'tenant_id': '',
'shared': False,
'admin_state_up': True}}
@ -289,7 +288,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
'admin_state_up': True,
'device_id': router_id,
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
'name': n_const.HA_PORT_NAME % tenant_id}
'name': constants.HA_PORT_NAME % tenant_id}
creation = functools.partial(p_utils.create_port, self._core_plugin,
context, {'port': args})
content = functools.partial(self._create_ha_port_binding, context,
@ -574,9 +573,9 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
for binding in bindings:
if not (binding.agent.is_active and
binding.agent.admin_state_up):
if binding.state == n_const.HA_ROUTER_STATE_ACTIVE:
if binding.state == constants.HA_ROUTER_STATE_ACTIVE:
router_active_agents_dead.append(binding.agent)
elif binding.state == n_const.HA_ROUTER_STATE_STANDBY:
elif binding.state == constants.HA_ROUTER_STATE_STANDBY:
router_standby_agents_dead.append(binding.agent)
if router_active_agents_dead:
# Just check if all l3_agents are down
@ -587,12 +586,12 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
# agent communication may be issue but router
# may still be active. We do not know the
# exact status of router.
state = n_const.HA_ROUTER_STATE_UNKNOWN
state = constants.HA_ROUTER_STATE_UNKNOWN
else:
# Make router status as standby on all dead agents
# as some other agents are alive , router can become
# active on them after some time
state = n_const.HA_ROUTER_STATE_STANDBY
state = constants.HA_ROUTER_STATE_STANDBY
for dead_agent in router_active_agents_dead:
self.update_routers_states(context, {router_id: state},
dead_agent.host)
@ -619,7 +618,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
# and l2pop would not work correctly.
return next(
(agent.host for agent, state in bindings
if state == n_const.HA_ROUTER_STATE_ACTIVE),
if state == constants.HA_ROUTER_STATE_ACTIVE),
None)
@log_helpers.log_method_call
@ -642,7 +641,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
router = routers_dict.get(binding.router_id)
router[constants.HA_INTERFACE_KEY] = port_dict
router[n_const.HA_ROUTER_STATE_KEY] = binding.state
router[constants.HA_ROUTER_STATE_KEY] = binding.state
interfaces = []
for router in routers_dict.values():
@ -707,7 +706,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
ports = self._core_plugin.get_ports(admin_ctx, filters=device_filter)
active_ports = (
port for port in ports
if states[port['device_id']] == n_const.HA_ROUTER_STATE_ACTIVE)
if states[port['device_id']] == constants.HA_ROUTER_STATE_ACTIVE)
for port in active_ports:
try:
@ -742,7 +741,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
"host": gateway_port_binding_host,
"status": gateway_port_status})
for ha_binding_agent, ha_binding_state in ha_bindings:
if ha_binding_state != n_const.HA_ROUTER_STATE_ACTIVE:
if ha_binding_state != constants.HA_ROUTER_STATE_ACTIVE:
continue
# For create router gateway, the gateway port may not be ACTIVE
# yet, so we return 'master' host directly.

View File

@ -13,6 +13,7 @@
# under the License.
import netaddr
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import metering as metering_exc
@ -20,7 +21,6 @@ from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import constants
from neutron.db import common_db_mixin as base_db
from neutron.db import l3_dvr_db
from neutron.extensions import metering

View File

@ -14,10 +14,10 @@
#
from alembic import op
from neutron_lib import constants as const
from oslo_utils import uuidutils
import sqlalchemy as sa
from neutron.common import constants as const
"""migrate to pluggable ipam """

View File

@ -11,10 +11,9 @@
# under the License.
from alembic import op
from neutron_lib import constants as lib_const
from neutron_lib import constants
import sqlalchemy as sa
from neutron.common import constants
"""Add routerport bindings for L3 HA
@ -59,11 +58,11 @@ def upgrade():
# back-ported
for router_port in session.query(router_ports).filter(
router_ports.c.port_type ==
lib_const.DEVICE_OWNER_ROUTER_HA_INTF):
constants.DEVICE_OWNER_ROUTER_HA_INTF):
router_port_tuples.discard((router_port.router_id,
router_port.port_id))
new_records = [dict(router_id=router_id, port_id=port_id,
port_type=lib_const.DEVICE_OWNER_ROUTER_HA_INTF)
port_type=constants.DEVICE_OWNER_ROUTER_HA_INTF)
for router_id, port_id in router_port_tuples]
op.bulk_insert(router_ports, new_records)
session.commit()

View File

@ -13,9 +13,9 @@
# under the License.
#
from neutron_lib import constants
import sqlalchemy as sa
from neutron.common import constants
from neutron.db import migration
"""Add unknown state to HA router

View File

@ -13,11 +13,11 @@
# under the License.
#
from neutron_lib import constants as n_const
from neutron_lib.db import model_base
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.common import constants as n_const
from neutron.db.models import agent as agent_model
from neutron.db import models_v2

View File

@ -33,7 +33,6 @@ from sqlalchemy.orm import scoped_session
from neutron._i18n import _
from neutron.common import _constants as const
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_mixin as rbac_mixin
@ -442,8 +441,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
# problems with comparing int and string in PostgreSQL. Here this
# string is converted to int to give an opportunity to use it as
# before.
if protocol in n_const.IP_PROTOCOL_NAME_ALIASES:
protocol = n_const.IP_PROTOCOL_NAME_ALIASES[protocol]
if protocol in constants.IP_PROTOCOL_NAME_ALIASES:
protocol = constants.IP_PROTOCOL_NAME_ALIASES[protocol]
return int(constants.IP_PROTOCOL_MAP.get(protocol, protocol))
def _get_ip_proto_name_and_num(self, protocol):
@ -452,8 +451,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
protocol = str(protocol)
if protocol in constants.IP_PROTOCOL_MAP:
return [protocol, str(constants.IP_PROTOCOL_MAP.get(protocol))]
elif protocol in n_const.IP_PROTOCOL_NUM_TO_NAME_MAP:
return [n_const.IP_PROTOCOL_NUM_TO_NAME_MAP.get(protocol),
elif protocol in constants.IP_PROTOCOL_NUM_TO_NAME_MAP:
return [constants.IP_PROTOCOL_NUM_TO_NAME_MAP.get(protocol),
protocol]
return [protocol, protocol]

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as const
from oslo_utils import uuidutils
from neutron.common import constants as const
from neutron.objects import ipam as ipam_objs
# Database operations for Neutron's DB-backed IPAM driver

View File

@ -15,7 +15,7 @@ import itertools
import uuid
import netaddr
from neutron_lib import constants as lib_constants
from neutron_lib import constants
from neutron_lib.db import constants as lib_db_const
from neutron_lib.objects import exceptions as o_exc
@ -24,7 +24,6 @@ from oslo_versionedobjects import fields as obj_fields
import six
from neutron._i18n import _
from neutron.common import constants
from neutron.common import utils
@ -33,7 +32,7 @@ class HARouterEnumField(obj_fields.AutoTypedField):
class IPV6ModeEnumField(obj_fields.AutoTypedField):
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.IPV6_MODES)
AUTO_TYPE = obj_fields.Enum(valid_values=constants.IPV6_MODES)
class RangeConstrainedInteger(obj_fields.Integer):
@ -60,7 +59,7 @@ class IPNetworkPrefixLen(RangeConstrainedInteger):
"""IP network (CIDR) prefix length custom Enum"""
def __init__(self, **kwargs):
super(IPNetworkPrefixLen, self).__init__(
start=0, end=lib_constants.IPv6_BITS,
start=0, end=constants.IPv6_BITS,
**kwargs)
@ -84,8 +83,8 @@ class PortRangeWith0Field(obj_fields.AutoTypedField):
class VlanIdRange(RangeConstrainedInteger):
def __init__(self, **kwargs):
super(VlanIdRange, self).__init__(start=lib_constants.MIN_VLAN_TAG,
end=lib_constants.MAX_VLAN_TAG,
super(VlanIdRange, self).__init__(start=constants.MIN_VLAN_TAG,
end=constants.MAX_VLAN_TAG,
**kwargs)
@ -156,7 +155,7 @@ class IPVersionEnumField(obj_fields.AutoTypedField):
class DscpMark(IntegerEnum):
def __init__(self, valid_values=None, **kwargs):
super(DscpMark, self).__init__(
valid_values=lib_constants.VALID_DSCP_MARKS)
valid_values=constants.VALID_DSCP_MARKS)
class DscpMarkField(obj_fields.AutoTypedField):
@ -164,7 +163,7 @@ class DscpMarkField(obj_fields.AutoTypedField):
class FlowDirectionEnumField(obj_fields.AutoTypedField):
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_DIRECTIONS)
AUTO_TYPE = obj_fields.Enum(valid_values=constants.VALID_DIRECTIONS)
class IpamAllocationStatusEnumField(obj_fields.AutoTypedField):
@ -182,7 +181,7 @@ class IpProtocolEnum(obj_fields.Enum):
super(IpProtocolEnum, self).__init__(
valid_values=list(
itertools.chain(
lib_constants.IP_PROTOCOL_MAP.keys(),
constants.IP_PROTOCOL_MAP.keys(),
[str(v) for v in range(256)]
)
),
@ -317,4 +316,4 @@ class RouterStatusEnumField(obj_fields.AutoTypedField):
class NetworkSegmentRangeNetworkTypeEnumField(obj_fields.AutoTypedField):
AUTO_TYPE = obj_fields.Enum(
valid_values=lib_constants.NETWORK_SEGMENT_RANGE_TYPES)
valid_values=constants.NETWORK_SEGMENT_RANGE_TYPES)

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_versionedobjects import fields as obj_fields
from neutron.common import constants
from neutron.db.models import agent as agent_model
from neutron.db.models import l3ha
from neutron.objects import base

View File

@ -16,11 +16,11 @@ import netaddr
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib.api.validators import availability_zone as az_validator
from neutron_lib import constants as n_const
from oslo_versionedobjects import fields as obj_fields
import six
from sqlalchemy import func
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db.models import dvr as dvr_models
from neutron.db.models import l3

View File

@ -15,6 +15,7 @@
import copy
from neutron_lib import constants as const
from oslo_log import log as logging
from oslo_policy import policy as oslo_policy
from oslo_utils import excutils
@ -22,7 +23,6 @@ from pecan import hooks
import webob
from neutron._i18n import _
from neutron.common import constants as const
from neutron.extensions import quotasv2
from neutron import manager
from neutron.pecan_wsgi import constants as pecan_constants

View File

@ -39,7 +39,6 @@ from neutron.agent import securitygroups_rpc as agent_sg_rpc
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as c_const
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
@ -319,7 +318,7 @@ class CommonAgentLoop(service.Service):
payload=events.DBEventPayload(
self.context, states=(device_details,),
resource_id=device))
elif c_const.NO_ACTIVE_BINDING in device_details:
elif constants.NO_ACTIVE_BINDING in device_details:
LOG.info("Device %s has no active binding in host", device)
else:
LOG.info("Device %s not defined on plugin", device)

View File

@ -40,7 +40,6 @@ from neutron.agent import securitygroups_rpc as agent_sg_rpc
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as c_const
from neutron.common import profiler as setup_profiler
from neutron.common import utils as n_utils
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config
@ -160,8 +159,9 @@ class SriovNicSwitchAgent(object):
self.connection)
configurations = {'device_mappings': physical_devices_mappings,
c_const.RP_BANDWIDTHS: rp_bandwidths,
c_const.RP_INVENTORY_DEFAULTS: rp_inventory_defaults,
n_constants.RP_BANDWIDTHS: rp_bandwidths,
n_constants.RP_INVENTORY_DEFAULTS:
rp_inventory_defaults,
'extensions': self.ext_manager.names()}
# TODO(mangelajo): optimize resource_versions (see ovs agent)
@ -355,7 +355,7 @@ class SriovNicSwitchAgent(object):
port_id,
(device, profile.get('pci_slot')))
self.ext_manager.handle_port(self.context, device_details)
elif c_const.NO_ACTIVE_BINDING in device_details:
elif n_constants.NO_ACTIVE_BINDING in device_details:
# Port was added but its binding in this agent
# hasn't been activated yet. It will be treated as
# added when binding is activated

View File

@ -23,7 +23,6 @@ import netaddr
from neutron_lib import constants as const
from neutron.common import constants as n_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import ovs_bridge
@ -134,7 +133,7 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge):
for ip in ip_addresses:
self.install_goto(
table_id=constants.ARP_SPOOF_TABLE, priority=2,
dl_type=n_const.ETHERTYPE_IPV6,
dl_type=const.ETHERTYPE_IPV6,
nw_proto=const.PROTO_NUM_IPV6_ICMP,
icmp_type=const.ICMPV6_TYPE_NA, nd_target=ip, in_port=port,
dest_table_id=constants.TRANSIENT_TABLE)
@ -142,7 +141,7 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge):
# Now that the rules are ready, direct icmpv6 neighbor advertisement
# traffic from the port into the anti-spoof table.
self.add_flow(table=constants.LOCAL_SWITCHING,
priority=10, dl_type=n_const.ETHERTYPE_IPV6,
priority=10, dl_type=const.ETHERTYPE_IPV6,
nw_proto=const.PROTO_NUM_IPV6_ICMP,
icmp_type=const.ICMPV6_TYPE_NA, in_port=port,
actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE))

View File

@ -55,7 +55,6 @@ from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import constants as c_const
from neutron.common import utils as n_utils
from neutron.conf.agent import common as agent_config
from neutron.conf.agent import xenapi_conf
@ -284,8 +283,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
'host': host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': self.bridge_mappings,
c_const.RP_BANDWIDTHS: self.rp_bandwidths,
c_const.RP_INVENTORY_DEFAULTS:
n_const.RP_BANDWIDTHS: self.rp_bandwidths,
n_const.RP_INVENTORY_DEFAULTS:
self.rp_inventory_defaults,
'integration_bridge':
ovs_conf.integration_bridge,
@ -1670,7 +1669,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
if c_const.NO_ACTIVE_BINDING in details:
if n_const.NO_ACTIVE_BINDING in details:
# Port was added to the bridge, but its binding in this
# agent hasn't been activated yet. It will be treated as
# added when binding is activated

View File

@ -88,7 +88,6 @@ from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db import address_scope_db
from neutron.db import agents_db
@ -2108,7 +2107,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# REVISIT(rkukura): Consider calling into MechanismDrivers to
# process device names, or having MechanismDrivers supply list
# of device prefixes to strip.
for prefix in n_const.INTERFACE_PREFIXES:
for prefix in const.INTERFACE_PREFIXES:
if device.startswith(prefix):
return device[len(prefix):]
# REVISIT(irenab): Consider calling into bound MD to

View File

@ -30,7 +30,6 @@ from sqlalchemy.orm import exc
from neutron.agent import _topics as n_topics
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import constants as c_const
from neutron.db import l3_hamode_db
from neutron.db import provisioning_blocks
from neutron.plugins.ml2 import db as ml2_db
@ -138,7 +137,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
"%(host)s", {'device': device,
'host': host})
return {'device': device,
c_const.NO_ACTIVE_BINDING: True}
n_const.NO_ACTIVE_BINDING: True}
network_qos_policy_id = port_context.network._network.get(
qos_consts.QOS_POLICY_ID)

View File

@ -35,7 +35,6 @@ import stevedore
from neutron._i18n import _
from neutron.common import cache_utils as cache
from neutron.common import constants as const
LOG = logging.getLogger(__name__)
@ -108,11 +107,11 @@ def set_rules(policies, overwrite=True):
def _is_attribute_explicitly_set(attribute_name, resource, target, action):
"""Verify that an attribute is present and is explicitly set."""
if target.get(const.ATTRIBUTES_TO_UPDATE):
if target.get(constants.ATTRIBUTES_TO_UPDATE):
# In the case of update, the function should not pay attention to a
# default value of an attribute, but check whether it was explicitly
# marked as being updated instead.
return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
return (attribute_name in target[constants.ATTRIBUTES_TO_UPDATE] and
target[attribute_name] is not constants.ATTR_NOT_SPECIFIED)
result = (attribute_name in target and
target[attribute_name] is not constants.ATTR_NOT_SPECIFIED)
@ -291,9 +290,9 @@ class OwnerCheck(policy.Check):
reason=err_reason)
parent_foreign_key = _RESOURCE_FOREIGN_KEYS.get(
"%ss" % parent_res, None)
if parent_res == const.EXT_PARENT_PREFIX:
if parent_res == constants.EXT_PARENT_PREFIX:
for resource in service_const.EXT_PARENT_RESOURCE_MAPPING:
key = "%s_%s_id" % (const.EXT_PARENT_PREFIX, resource)
key = "%s_%s_id" % (constants.EXT_PARENT_PREFIX, resource)
if key in target:
parent_foreign_key = key
parent_res = resource

View File

@ -30,7 +30,6 @@ from oslo_utils import timeutils
from neutron._i18n import _
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.conf.agent import common as config
from neutron.conf.services import metering_agent
from neutron import manager
@ -136,7 +135,7 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
self.label_tenant_id = {}
for router in self.routers.values():
tenant_id = router['tenant_id']
labels = router.get(n_const.METERING_LABEL_KEY, [])
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
self.label_tenant_id[label_id] = tenant_id

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_config import cfg
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
@ -21,7 +22,6 @@ from neutron.agent.l3 import dvr_snat_ns
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.conf.agent import common as config
from neutron.services.metering.drivers import abstract_driver

View File

@ -23,7 +23,6 @@ from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants
from neutron.objects.qos import policy as policy_object
@ -92,10 +91,10 @@ class QosServiceDriverManager(object):
"start": possible_values[0],
"end": possible_values[1]
}
parameter_type = constants.VALUES_TYPE_RANGE
parameter_type = lib_constants.VALUES_TYPE_RANGE
elif validator == 'type:values':
parameter_values = possible_values
parameter_type = constants.VALUES_TYPE_CHOICES
parameter_type = lib_constants.VALUES_TYPE_CHOICES
return parameter_values, parameter_type
def call(self, method_name, *args, **kwargs):

View File

@ -16,12 +16,12 @@ import sys
import types
import mock
from neutron_lib import constants
from oslo_config import cfg
from neutron.agent.l3 import agent
from neutron.agent.l3 import namespaces
from neutron.agent import l3_agent
from neutron.common import constants
class L3NATAgentForTest(agent.L3NATAgentWithStateReport):

View File

@ -19,7 +19,6 @@ import netaddr
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.common import constants as n_consts
from neutron.common import utils as common_utils
from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
constants as ovs_consts)
@ -82,7 +81,7 @@ class ConnectionTester(fixtures.Fixture):
UDP = net_helpers.NetcatTester.UDP
TCP = net_helpers.NetcatTester.TCP
ICMP = constants.PROTO_NAME_ICMP
ARP = n_consts.ETHERTYPE_NAME_ARP
ARP = constants.ETHERTYPE_NAME_ARP
INGRESS = constants.INGRESS_DIRECTION
EGRESS = constants.EGRESS_DIRECTION

View File

@ -18,7 +18,6 @@ import tempfile
from neutron_lib import constants
from neutron.common import constants as c_const
from neutron.common import utils
from neutron.plugins.ml2.extensions import qos as qos_ext
from neutron.tests import base
@ -208,7 +207,7 @@ class OVSConfigFixture(ConfigFixture):
'tun_peer_patch_port': self._generate_tun_peer()})
else:
if env_desc.report_bandwidths:
self.config['ovs'][c_const.RP_BANDWIDTHS] = \
self.config['ovs'][constants.RP_BANDWIDTHS] = \
'%s:%s:%s' % (ext_dev, MINIMUM_BANDWIDTH_EGRESS_KBPS,
MINIMUM_BANDWIDTH_INGRESS_KBPS)

View File

@ -18,7 +18,6 @@ from neutronclient.common import exceptions as nc_exc
from oslo_config import cfg
from neutron.agent.linux import ip_lib
from neutron.common import constants as common_const
from neutron.common import utils as common_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent import \
linuxbridge_neutron_agent as lb_agent
@ -38,7 +37,7 @@ class EnvironmentDescription(object):
mech_drivers='openvswitch,linuxbridge',
service_plugins='router', arp_responder=False,
agent_down_time=75, router_scheduler=None,
global_mtu=common_const.DEFAULT_NETWORK_MTU,
global_mtu=constants.DEFAULT_NETWORK_MTU,
debug_iptables=False, log=False, report_bandwidths=False):
self.network_type = network_type
self.l2_pop = l2_pop

View File

@ -16,7 +16,6 @@ import functools
from neutron_lib import constants
from neutron.common import constants as c_const
from neutron.common import utils
from neutron.tests.common import net_helpers
from neutron.tests.fullstack import base
@ -67,22 +66,23 @@ class TestAgentBandwidthReport(base.BaseFullStackTestCase):
return False
bridge_or_devices = agent_configurations[mapping_key][physnet]
if (c_const.RP_BANDWIDTHS not in agent_configurations or
c_const.RP_INVENTORY_DEFAULTS not in agent_configurations):
if (constants.RP_BANDWIDTHS not in agent_configurations or
constants.RP_INVENTORY_DEFAULTS not in
agent_configurations):
return False
if mapping_key == self.BR_MAPPINGS:
if (bridge_or_devices not in
agent_configurations[c_const.RP_BANDWIDTHS]):
agent_configurations[constants.RP_BANDWIDTHS]):
return False
else:
for device in bridge_or_devices:
if (device not in
agent_configurations[c_const.RP_BANDWIDTHS]):
agent_configurations[constants.RP_BANDWIDTHS]):
return False
for device in agent_configurations[c_const.RP_BANDWIDTHS]:
conf_device = agent_configurations[c_const.RP_BANDWIDTHS][device]
for device in agent_configurations[constants.RP_BANDWIDTHS]:
conf_device = agent_configurations[constants.RP_BANDWIDTHS][device]
if (f_const.MINIMUM_BANDWIDTH_INGRESS_KBPS !=
conf_device['ingress'] and
f_const.MINIMUM_BANDWIDTH_EGRESS_KBPS !=
@ -98,7 +98,7 @@ class TestAgentBandwidthReport(base.BaseFullStackTestCase):
def _add_new_device_to_agent_config(self, l2_agent_config,
mapping_key_name, new_dev):
old_bw = l2_agent_config[c_const.RP_BANDWIDTHS]
old_bw = l2_agent_config[constants.RP_BANDWIDTHS]
old_mappings = l2_agent_config[mapping_key_name]
if new_dev in old_bw or new_dev in old_mappings:
return
@ -109,7 +109,7 @@ class TestAgentBandwidthReport(base.BaseFullStackTestCase):
f_const.MINIMUM_BANDWIDTH_INGRESS_KBPS)
l2_agent_config[mapping_key_name] = '%s,%s' % (
old_mappings, new_mappings)
l2_agent_config[c_const.RP_BANDWIDTHS] = '%s,%s' % (
l2_agent_config[constants.RP_BANDWIDTHS] = '%s,%s' % (
old_bw, new_bw)
def _change_agent_conf_and_restart_agent(self, l2_agent_config, l2_agent,

View File

@ -34,7 +34,6 @@ from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import keepalived
from neutron.common import constants as n_const
from neutron.common import utils as common_utils
from neutron.conf.agent import common as agent_config
from neutron.conf import common as common_config
@ -240,7 +239,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
ip_wrapper = ip_lib.IPWrapper(namespace=router.ns_name)
ra_state = ip_wrapper.netns.execute(['sysctl', '-b',
'net.ipv6.conf.%s.accept_ra' % external_device_name])
self.assertEqual(enabled, int(ra_state) != n_const.ACCEPT_RA_DISABLED)
self.assertEqual(
enabled, int(ra_state) != constants.ACCEPT_RA_DISABLED)
def _wait_until_ipv6_forwarding_has_state(self, ns_name, dev_name, state):

View File

@ -33,7 +33,6 @@ from neutron.agent.l3 import dvr_snat_ns
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.tests import base as test_base
from neutron.tests.common import l3_test_common
@ -160,7 +159,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
new_fg_port = copy.deepcopy(fg_port)
for subnet in new_fg_port['subnets']:
subnet['gateway_ip'] = '19.4.4.2'
router.router[n_const.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
router.router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
self.assertRaises(l3_exc.FloatingIpSetupException,
self.agent._process_updated_router,
router.router)
@ -212,7 +211,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
new_fg_port = copy.deepcopy(fg_port)
for subnet in new_fg_port['subnets']:
subnet['gateway_ip'] = '19.4.4.2'
router.router[n_const.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
router.router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
self.assertRaises(l3_exc.FloatingIpSetupException,
self.manage_router,
self.agent,
@ -253,7 +252,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# update_gateway_port to be called without the fg- port
fip_subscribe.return_value = False
fip_ns.agent_gateway_port = (
router_info[n_const.FLOATINGIP_AGENT_INTF_KEY])
router_info[lib_constants.FLOATINGIP_AGENT_INTF_KEY])
# This will raise the exception and will also clear
# subscription for the ext_net_id
self.assertRaises(l3_exc.FloatingIpSetupException,
@ -614,7 +613,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
def _add_fip_agent_gw_port_info_to_router(self, router, external_gw_port):
# Add fip agent gateway port information to the router_info
fip_gw_port_list = router.get(
n_const.FLOATINGIP_AGENT_INTF_KEY, [])
lib_constants.FLOATINGIP_AGENT_INTF_KEY, [])
if not fip_gw_port_list and external_gw_port:
# Get values from external gateway port
fixed_ip = external_gw_port['fixed_ips'][0]
@ -624,7 +623,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
fip_gw_port_ip = str(netaddr.IPAddress(port_ip) + 5)
# Add floatingip agent gateway port info to router
prefixlen = netaddr.IPNetwork(float_subnet['cidr']).prefixlen
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = [
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [
{'subnets': [
{'cidr': float_subnet['cidr'],
'gateway_ip': float_subnet['gateway_ip'],
@ -643,9 +642,9 @@ class TestDvrRouter(framework.L3AgentTestFramework):
def _add_snat_port_info_to_router(self, router, internal_ports):
# Add snat port information to the router
snat_port_list = router.get(n_const.SNAT_ROUTER_INTF_KEY, [])
snat_port_list = router.get(lib_constants.SNAT_ROUTER_INTF_KEY, [])
if not snat_port_list and internal_ports:
router[n_const.SNAT_ROUTER_INTF_KEY] = []
router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
for port in internal_ports:
# Get values from internal port
fixed_ip = port['fixed_ips'][0]
@ -672,7 +671,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# Get the address scope if there is any
if 'address_scopes' in port:
snat_router_port['address_scopes'] = port['address_scopes']
router[n_const.SNAT_ROUTER_INTF_KEY].append(
router[lib_constants.SNAT_ROUTER_INTF_KEY].append(
snat_router_port)
def _assert_dvr_external_device(self, router):
@ -754,7 +753,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# We need to fetch the floatingip agent gateway port info
# from the router_info
floating_agent_gw_port = (
router.router[n_const.FLOATINGIP_AGENT_INTF_KEY])
router.router[lib_constants.FLOATINGIP_AGENT_INTF_KEY])
self.assertTrue(floating_agent_gw_port)
external_gw_port = floating_agent_gw_port[0]
@ -800,7 +799,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.agent.conf.agent_mode = 'dvr_snat'
router_info = self.generate_dvr_router_info(
enable_snat=True, enable_ha=True, enable_gw=True)
fip_agent_gw_port = router_info[n_const.FLOATINGIP_AGENT_INTF_KEY]
fip_agent_gw_port = router_info[
lib_constants.FLOATINGIP_AGENT_INTF_KEY]
self.mock_plugin_api.get_agent_gateway_port.return_value = (
fip_agent_gw_port[0])
router1 = self.manage_router(self.agent, router_info)
@ -1048,11 +1048,12 @@ class TestDvrRouter(framework.L3AgentTestFramework):
"""
self.agent.conf.agent_mode = 'dvr'
router_info = self.generate_dvr_router_info()
fip_agent_gw_port = router_info[n_const.FLOATINGIP_AGENT_INTF_KEY]
fip_agent_gw_port = router_info[
lib_constants.FLOATINGIP_AGENT_INTF_KEY]
# Now let us not pass the FLOATINGIP_AGENT_INTF_KEY, to emulate
# that the server did not create the port, since there was no valid
# host binding.
router_info[n_const.FLOATINGIP_AGENT_INTF_KEY] = []
router_info[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = []
self.mock_plugin_api.get_agent_gateway_port.return_value = (
fip_agent_gw_port[0])
router1 = self.manage_router(self.agent, router_info)
@ -1252,10 +1253,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
"""
self.agent.conf.agent_mode = 'dvr_snat'
router_info = self.generate_dvr_router_info()
snat_internal_port = router_info[n_const.SNAT_ROUTER_INTF_KEY]
snat_internal_port = router_info[lib_constants.SNAT_ROUTER_INTF_KEY]
router1 = self.manage_router(self.agent, router_info)
csnat_internal_port = (
router1.router[n_const.SNAT_ROUTER_INTF_KEY])
router1.router[lib_constants.SNAT_ROUTER_INTF_KEY])
# Now save the internal device name to verify later
internal_device_name = router1._get_snat_int_device_name(
csnat_internal_port[0]['id'])
@ -1271,7 +1272,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# Now let us not pass the SNAT_ROUTER_INTF_KEY, to emulate
# that the server did not send it, since the interface has been
# removed.
router1.router[n_const.SNAT_ROUTER_INTF_KEY] = []
router1.router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
self.agent._process_updated_router(router1.router)
router_updated = self.agent.router_info[router_info['id']]
self._assert_snat_namespace_exists(router_updated)
@ -1752,7 +1753,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self, router, agent, address_scopes, agent_gw_port,
rfp_device, fpr_device, no_external=False):
router.router[n_const.SNAT_ROUTER_INTF_KEY] = []
router.router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
router.router['gw_port'] = ""
router.router['gw_port_host'] = ""
self.agent._process_updated_router(router.router)
@ -1802,7 +1803,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router_info[lib_constants.INTERFACE_KEY][1]['address_scopes'] = (
address_scope2)
# should have the same address_scopes as gw_port
fip_agent_gw_ports = router_info[n_const.FLOATINGIP_AGENT_INTF_KEY]
fip_agent_gw_ports = router_info[
lib_constants.FLOATINGIP_AGENT_INTF_KEY]
fip_agent_gw_ports[0]['address_scopes'] = (
router_info['gw_port']['address_scopes'])
self.mock_plugin_api.get_agent_gateway_port.return_value = (
@ -1932,7 +1934,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
for subnet in new_fg_port['subnets']:
subnet['gateway_ip'] = None
router.router[n_const.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
router.router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
router.process()
self.assertIsNone(ex_gw_device.route.get_gateway())
self.assertIsNone(fg_device.route.get_gateway())
@ -1962,7 +1964,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router_info['gw_port']['address_scopes'] = {
str(lib_constants.IP_VERSION_4): gw_address_scope}
fip_agent_gw_ports = router_info[
n_const.FLOATINGIP_AGENT_INTF_KEY]
lib_constants.FLOATINGIP_AGENT_INTF_KEY]
fip_agent_gw_ports[0]['address_scopes'] = router_info['gw_port'][
'address_scopes']
router_info[lib_constants.INTERFACE_KEY][0]['address_scopes'] = (
@ -1970,7 +1972,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router_info[lib_constants.INTERFACE_KEY][1]['address_scopes'] = (
address_scope2)
# Renew the address scope
router_info[n_const.SNAT_ROUTER_INTF_KEY] = []
router_info[lib_constants.SNAT_ROUTER_INTF_KEY] = []
self._add_snat_port_info_to_router(
router_info, router_info[lib_constants.INTERFACE_KEY])

View File

@ -31,7 +31,6 @@ from neutron.agent.linux import iptables_firewall
from neutron.agent.linux import openvswitch_firewall
from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.cmd.sanity import checks
from neutron.common import constants as n_const
from neutron.conf.agent import securitygroups_rpc as security_config
from neutron.tests.common import conn_testers
from neutron.tests.common import helpers
@ -115,7 +114,7 @@ class BaseFirewallTestCase(linux_base.BaseOVSLinuxTestCase):
def initialize_iptables(self):
cfg.CONF.set_override('enable_ipset', self.enable_ipset,
'SECURITYGROUP')
br_name = ('brq' + self.net_id)[:n_const.LINUX_DEV_LEN]
br_name = ('brq' + self.net_id)[:constants.LINUX_DEV_LEN]
tester = self.useFixture(
conn_testers.LinuxBridgeConnectionTester(self.ip_cidr,
bridge_name=br_name))

View File

@ -25,7 +25,6 @@ from neutron_lib import constants
from neutron_lib import context
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as n_const
from neutron.tests.common import helpers
from neutron.tests.functional import base as functional_base
from neutron.tests.unit.plugins.ml2 import base as ml2_test_base
@ -266,7 +265,8 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self.assertEqual(
router['id'], result[router['id']]['id'])
self.assertIn(n_const.FLOATINGIP_AGENT_INTF_KEY, result[router['id']])
self.assertIn(constants.FLOATINGIP_AGENT_INTF_KEY,
result[router['id']])
self.l3_plugin._get_fip_agent_gw_ports.assert_called_once_with(
self.context, self.l3_agent['id'])
self.l3_plugin._get_snat_sync_interfaces.assert_called_once_with(
@ -864,7 +864,7 @@ class L3DvrTestCase(L3DvrTestCaseBase):
floatingips = router_info[0][constants.FLOATINGIP_KEY]
self.assertEqual(1, len(floatingips))
self.assertTrue(floatingips[0][constants.DVR_SNAT_BOUND])
self.assertEqual(n_const.FLOATING_IP_HOST_NEEDS_BINDING,
self.assertEqual(constants.FLOATING_IP_HOST_NEEDS_BINDING,
floatingips[0]['host'])
router1_info = (
self.l3_plugin.list_active_sync_routers_on_active_l3_agent(

View File

@ -30,7 +30,6 @@ from oslo_utils import netutils
from oslo_utils import timeutils
import unittest2
from neutron.common import constants as n_const
from neutron.services.logapi.common import constants as log_const
@ -238,8 +237,8 @@ def get_random_prefixlen(version=4):
return random.randint(0, maxlen)
def get_random_port(start=n_const.PORT_RANGE_MIN):
return random.randint(start, n_const.PORT_RANGE_MAX)
def get_random_port(start=constants.PORT_RANGE_MIN):
return random.randint(start, constants.PORT_RANGE_MAX)
def get_random_vlan():
@ -247,7 +246,7 @@ def get_random_vlan():
def get_random_ip_version():
return random.choice(n_const.IP_ALLOWED_VERSIONS)
return random.choice(constants.IP_ALLOWED_VERSIONS)
def get_random_EUI():
@ -275,11 +274,11 @@ def get_random_ip_address(version=4):
def get_random_router_status():
return random.choice(n_const.VALID_ROUTER_STATUS)
return random.choice(constants.VALID_ROUTER_STATUS)
def get_random_floatingip_status():
return random.choice(n_const.VALID_FLOATINGIP_STATUS)
return random.choice(constants.VALID_FLOATINGIP_STATUS)
def get_random_flow_direction():
@ -287,15 +286,15 @@ def get_random_flow_direction():
def get_random_ha_states():
return random.choice(n_const.VALID_HA_STATES)
return random.choice(constants.VALID_HA_STATES)
def get_random_ether_type():
return random.choice(n_const.VALID_ETHERTYPES)
return random.choice(constants.VALID_ETHERTYPES)
def get_random_ipam_status():
return random.choice(n_const.VALID_IPAM_ALLOCATION_STATUSES)
return random.choice(constants.VALID_IPAM_ALLOCATION_STATUSES)
def get_random_ip_protocol():
@ -303,7 +302,7 @@ def get_random_ip_protocol():
def get_random_port_binding_statuses():
return random.choice(n_const.PORT_BINDING_STATUSES)
return random.choice(constants.PORT_BINDING_STATUSES)
def get_random_network_segment_range_network_type():

View File

@ -26,7 +26,6 @@ from neutron.agent.linux import iptables_manager
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants
from neutron.objects import port_forwarding as pf_obj
from neutron.objects import router
from neutron.tests import base
@ -146,7 +145,7 @@ class FipPortForwardingExtensionTestCase(PortForwardingExtensionBaseTestCase):
def _get_chainrule_tag_from_pf_obj(self, target_obj):
rule_tag = 'fip_portforwarding-' + target_obj.id
chain_name = (
'pf-' + target_obj.id)[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
'pf-' + target_obj.id)[:lib_const.MAX_IPTABLES_CHAIN_LEN_WRAP]
chain_rule = (chain_name,
'-d %s/32 -p %s -m %s --dport %s '
'-j DNAT --to-destination %s:%s' % (
@ -237,7 +236,7 @@ class FipPortForwardingExtensionTestCase(PortForwardingExtensionBaseTestCase):
mock_ip_device.return_value = mock_delete
self.fip_pf_ext.update_router(self.context, self.router)
current_chain = ('pf-' + self.portforwarding1.id)[
:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
:lib_const.MAX_IPTABLES_CHAIN_LEN_WRAP]
mock_remove_chain.assert_called_once_with(current_chain)
mock_delete.delete_socket_conntrack_state.assert_called_once_with(
str(self.portforwarding1.floating_ip_address),
@ -268,7 +267,7 @@ class FipPortForwardingExtensionTestCase(PortForwardingExtensionBaseTestCase):
mock_ip_device.return_value = mock_device
self.fip_pf_ext.update_router(self.context, self.router)
current_chain = ('pf-' + self.portforwarding1.id)[
:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
:lib_const.MAX_IPTABLES_CHAIN_LEN_WRAP]
mock_remove_chain.assert_called_once_with(current_chain)
mock_device.delete_socket_conntrack_state.assert_called_once_with(
str(self.portforwarding1.floating_ip_address),

View File

@ -51,7 +51,6 @@ from neutron.agent.linux import pd
from neutron.agent.linux import ra
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const
from neutron.conf.agent import common as agent_config
from neutron.conf.agent.l3 import config as l3_config
from neutron.conf.agent.l3 import ha as ha_conf
@ -319,8 +318,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router_info.ha_state = 'master'
with mock.patch.object(agent.state_change_notifier,
'queue_event') as queue_event:
agent.check_ha_state_for_router(router.id,
n_const.HA_ROUTER_STATE_STANDBY)
agent.check_ha_state_for_router(
router.id, lib_constants.HA_ROUTER_STATE_STANDBY)
queue_event.assert_called_once_with((router.id, 'master'))
def test_check_ha_state_for_router_standby_standby(self):
@ -332,8 +331,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router_info.ha_state = 'backup'
with mock.patch.object(agent.state_change_notifier,
'queue_event') as queue_event:
agent.check_ha_state_for_router(router.id,
n_const.HA_ROUTER_STATE_STANDBY)
agent.check_ha_state_for_router(
router.id, lib_constants.HA_ROUTER_STATE_STANDBY)
queue_event.assert_not_called()
def test_periodic_sync_routers_task_raise_exception(self):
@ -1040,7 +1039,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
'-o %s -j SNAT --to-source %s' % (interface_name, source_nat_ip),
'-m mark ! --mark 0x2/%s -m conntrack --ctstate DNAT '
'-j SNAT --to-source %s' %
(n_const.ROUTER_MARK_MASK, source_nat_ip)]
(lib_constants.ROUTER_MARK_MASK, source_nat_ip)]
for r in nat_rules:
if negate:
self.assertNotIn(r.rule, expected_rules)
@ -1048,7 +1047,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.assertIn(r.rule, expected_rules)
expected_rules = [
'-i %s -j MARK --set-xmark 0x2/%s' %
(interface_name, n_const.ROUTER_MARK_MASK),
(interface_name, lib_constants.ROUTER_MARK_MASK),
'-o %s -m connmark --mark 0x0/%s -j CONNMARK '
'--save-mark --nfmask %s --ctmask %s' %
(interface_name,
@ -1281,7 +1280,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -1346,7 +1345,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -1412,7 +1411,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -1489,7 +1488,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = []
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = []
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -1536,7 +1535,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -1589,7 +1588,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@ -2298,7 +2297,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
snat_rule2 = ("-A %s-snat -m mark ! --mark 0x2/%s "
"-m conntrack --ctstate DNAT "
"-j SNAT --to-source %s") % (
wrap_name, n_const.ROUTER_MARK_MASK,
wrap_name, lib_constants.ROUTER_MARK_MASK,
ex_gw_port['fixed_ips'][0]['ip_address'])
self.assertIn(jump_float_rule, nat_rules)
@ -2311,7 +2310,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
mangle_rules = list(map(str, ri.iptables_manager.ipv4['mangle'].rules))
mangle_rule = ("-A %s-mark -i iface "
"-j MARK --set-xmark 0x2/%s" %
(wrap_name, n_const.ROUTER_MARK_MASK))
(wrap_name, lib_constants.ROUTER_MARK_MASK))
self.assertIn(mangle_rule, mangle_rules)
def test_process_router_delete_stale_internal_devices(self):
@ -3812,7 +3811,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
{'interface_name':
namespaces.INTERNAL_DEV_PREFIX + '+',
'value': self.conf.metadata_access_mark,
'mask': n_const.ROUTER_MARK_MASK}),
'mask': lib_constants.ROUTER_MARK_MASK}),
mock.call.add_rule(
'float-snat',
'-m connmark --mark 0x0/0xffff0000 '
@ -3857,7 +3856,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
{'interface_name':
namespaces.INTERNAL_DEV_PREFIX + '+',
'value': self.conf.metadata_access_mark,
'mask': n_const.ROUTER_MARK_MASK})])
'mask': lib_constants.ROUTER_MARK_MASK})])
mock_iptables_manager.ipv4['mangle'].assert_has_calls(v4_mangle_calls,
any_order=True)

View File

@ -27,7 +27,6 @@ from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import router_info
from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.common import constants as n_const
from neutron.common import utils as common_utils
from neutron.conf.agent import common as agent_config
from neutron.conf.agent.l3 import config as l3_config
@ -648,7 +647,7 @@ class TestDvrRouterOperations(base.BaseTestCase):
)
router = l3_test_common.prepare_router_data(enable_snat=True)
router[n_const.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = agent_gateway_port
router['distributed'] = True
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)

View File

@ -24,7 +24,6 @@ from neutron.agent.common import utils
from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.agent.linux.openvswitch_firewall import exceptions
from neutron.agent.linux.openvswitch_firewall import firewall as ovsfw
from neutron.common import constants as n_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
@ -531,7 +530,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
'output:{:d},resubmit(,{:d})'.format(
self.port_ofport,
ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE),
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_TCP,
priority=77,
reg5=self.port_ofport,
@ -576,7 +575,7 @@ class TestOVSFirewallDriver(base.BaseTestCase):
filter_rules = [mock.call(
actions='resubmit(,{:d})'.format(
ovs_consts.ACCEPT_OR_INGRESS_TABLE),
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_UDP,
priority=77,
ct_state=ovsfw_consts.OF_STATE_NEW_NOT_ESTABLISHED,

View File

@ -18,7 +18,6 @@ from neutron_lib import constants
from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
from neutron.agent.linux.openvswitch_firewall import firewall as ovsfw
from neutron.agent.linux.openvswitch_firewall import rules
from neutron.common import constants as n_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
from neutron.tests import base
@ -77,7 +76,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IP,
'dl_type': constants.ETHERTYPE_IP,
'reg_port': self.port.ofport,
}
self._test_create_flows_from_rule_and_port_helper(rule,
@ -92,7 +91,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IP,
'dl_type': constants.ETHERTYPE_IP,
'reg_port': self.port.ofport,
'nw_src': '192.168.0.0/24',
'nw_dst': '10.0.0.1/32',
@ -109,7 +108,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IP,
'dl_type': constants.ETHERTYPE_IP,
'reg_port': self.port.ofport,
'nw_src': '192.168.0.0/24',
}
@ -123,7 +122,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IPV6,
'dl_type': constants.ETHERTYPE_IPV6,
'reg_port': self.port.ofport,
}
self._test_create_flows_from_rule_and_port_helper(rule,
@ -138,7 +137,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IPV6,
'dl_type': constants.ETHERTYPE_IPV6,
'reg_port': self.port.ofport,
'ipv6_src': '2001:db8:bbbb::1/64',
'ipv6_dst': '2001:db8:aaaa::1/64',
@ -155,7 +154,7 @@ class TestCreateFlowsFromRuleAndPort(base.BaseTestCase):
}
expected_template = {
'priority': 74,
'dl_type': n_const.ETHERTYPE_IPV6,
'dl_type': constants.ETHERTYPE_IPV6,
'reg_port': self.port.ofport,
'ipv6_src': '2001:db8:bbbb::1/64',
}
@ -343,7 +342,7 @@ class TestCreateFlowsForIpAddress(base.BaseTestCase):
expected_template = {
'table': ovs_consts.RULES_EGRESS_TABLE,
'priority': 72,
'dl_type': n_const.ETHERTYPE_IP,
'dl_type': constants.ETHERTYPE_IP,
'reg_net': 0x123,
'nw_dst': '192.168.0.1/32'
}
@ -377,7 +376,7 @@ class TestCreateConjFlows(base.BaseTestCase):
conj_id = 1234
expected_template = {
'table': ovs_consts.RULES_INGRESS_TABLE,
'dl_type': n_const.ETHERTYPE_IPV6,
'dl_type': constants.ETHERTYPE_IPV6,
'priority': 71,
'conj_id': conj_id,
'reg_port': port.ofport

View File

@ -18,6 +18,7 @@ import sys
import fixtures
import mock
from neutron_lib import constants
from neutron_lib import exceptions
from neutron_lib.exceptions import l3 as l3_exc
from oslo_config import cfg
@ -27,7 +28,6 @@ from neutron._i18n import _
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import utils as linux_utils
from neutron.common import constants
from neutron.tests import base
from neutron.tests import tools

View File

@ -14,12 +14,12 @@
# under the License.
import mock
from neutron_lib import constants
from neutron_lib.exceptions import qos as qos_exc
from neutron_lib.services.qos import constants as qos_consts
from pyroute2.netlink import rtnl
from neutron.agent.linux import tc_lib
from neutron.common import constants
from neutron.common import utils
from neutron.privileged.agent.linux import tc_lib as priv_tc_lib
from neutron.tests import base

View File

@ -16,6 +16,7 @@
import os
import mock
from neutron_lib import constants
from oslo_config import cfg
from oslo_utils import uuidutils
@ -23,7 +24,6 @@ from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import router_info
from neutron.agent.linux import iptables_manager
from neutron.agent.metadata import driver as metadata_driver
from neutron.common import constants
from neutron.conf.agent import common as agent_config
from neutron.conf.agent.l3 import config as l3_config
from neutron.conf.agent.l3 import ha as ha_conf

View File

@ -26,7 +26,6 @@ from oslo_context import context as oslo_context
from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.common import constants as n_const
from neutron.objects import network
from neutron.objects import ports
from neutron.tests import base
@ -291,7 +290,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
self.assertEqual(self._port_id, entry['device'])
self.assertEqual(self._port_id, entry['port_id'])
self.assertEqual(self._network_id, entry['network_id'])
self.assertNotIn(n_const.NO_ACTIVE_BINDING, entry)
self.assertNotIn(constants.NO_ACTIVE_BINDING, entry)
def test_get_device_details_binding_not_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
@ -301,4 +300,4 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
self.assertEqual(self._port_id, entry['device'])
self.assertNotIn('port_id', entry)
self.assertNotIn('network_id', entry)
self.assertIn(n_const.NO_ACTIVE_BINDING, entry)
self.assertIn(constants.NO_ACTIVE_BINDING, entry)

View File

@ -27,7 +27,6 @@ import six
import testscenarios
import testtools
from neutron.common import constants as common_constants
from neutron.common import utils
from neutron.tests import base
from neutron.tests.unit import tests
@ -490,7 +489,7 @@ class BaseUnitConversionTest(object):
class TestSIUnitConversions(BaseUnitConversionTest, base.BaseTestCase):
base_unit = common_constants.SI_BASE
base_unit = constants.SI_BASE
def test_bits_to_kilobits(self):
test_values = [
@ -509,7 +508,7 @@ class TestSIUnitConversions(BaseUnitConversionTest, base.BaseTestCase):
class TestIECUnitConversions(BaseUnitConversionTest, base.BaseTestCase):
base_unit = common_constants.IEC_BASE
base_unit = constants.IEC_BASE
def test_bits_to_kilobits(self):
test_values = [

View File

@ -39,7 +39,6 @@ import testtools
from neutron.agent.common import utils as agent_utils
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as n_const
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import l3_agentschedulers_db
@ -191,56 +190,56 @@ class L3HATestCase(L3HATestFramework):
self):
router = self._create_router()
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent2['host'])
with mock.patch.object(agent_utils, 'is_agent_down',
return_value=True):
self._assert_ha_state_for_agent(router, self.agent1,
n_const.HA_ROUTER_STATE_UNKNOWN)
constants.HA_ROUTER_STATE_UNKNOWN)
def test_get_l3_bindings_hosting_router_agents_admin_state_up_is_false(
self):
router = self._create_router()
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent2['host'])
helpers.set_agent_admin_state(self.agent1['id'], admin_state_up=False)
self._assert_ha_state_for_agent(router, self.agent1,
n_const.HA_ROUTER_STATE_STANDBY)
constants.HA_ROUTER_STATE_STANDBY)
def test_get_l3_bindings_hosting_router_agents_admin_state_up_is_true(
self):
router = self._create_router()
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent2['host'])
helpers.set_agent_admin_state(self.agent1['id'], admin_state_up=True)
self._assert_ha_state_for_agent(router, self.agent1,
n_const.HA_ROUTER_STATE_ACTIVE)
constants.HA_ROUTER_STATE_ACTIVE)
def test_get_l3_bindings_hosting_router_with_ha_states_one_dead(self):
router = self._create_router()
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_STANDBY},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_STANDBY},
self.agent2['host'])
with mock.patch.object(agent_utils, 'is_agent_down',
return_value=True):
# With above mock all agents are in dead state
# hence router state is Unknown overall.
self._assert_ha_state_for_agent(
router, self.agent1, n_const.HA_ROUTER_STATE_UNKNOWN)
router, self.agent1, constants.HA_ROUTER_STATE_UNKNOWN)
def test_ha_router_create(self):
router = self._create_router()
@ -773,7 +772,7 @@ class L3HATestCase(L3HATestFramework):
routers = self.plugin.get_ha_sync_data_for_host(
self.admin_ctx, self.agent1['host'], self.agent1)
for router in routers:
self.assertEqual('standby', router[n_const.HA_ROUTER_STATE_KEY])
self.assertEqual('standby', router[constants.HA_ROUTER_STATE_KEY])
states = {router1['id']: 'active',
router2['id']: 'standby'}
@ -784,7 +783,7 @@ class L3HATestCase(L3HATestFramework):
self.admin_ctx, self.agent1['host'], self.agent1)
for router in routers:
self.assertEqual(states[router['id']],
router[n_const.HA_ROUTER_STATE_KEY])
router[constants.HA_ROUTER_STATE_KEY])
def test_sync_ha_router_info_ha_interface_port_concurrently_deleted(self):
ctx = self.admin_ctx
@ -851,7 +850,7 @@ class L3HATestCase(L3HATestFramework):
router2['id']: 'active'})
routers = self.plugin.get_ha_sync_data_for_host(
self.admin_ctx, self.agent1['host'], self.agent1)
self.assertEqual('active', routers[0][n_const.HA_ROUTER_STATE_KEY])
self.assertEqual('active', routers[0][constants.HA_ROUTER_STATE_KEY])
def test_update_routers_states_port_not_found(self):
router1 = self._create_router()
@ -1384,10 +1383,10 @@ class L3HAModeDbTestCase(L3HATestFramework):
interface_info)
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_STANDBY},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_STANDBY},
self.agent2['host'])
routers = self.plugin._get_sync_routers(self.admin_ctx,
@ -1395,10 +1394,10 @@ class L3HAModeDbTestCase(L3HATestFramework):
self.assertEqual(self.agent1['host'], routers[0]['gw_port_host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_STANDBY},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_STANDBY},
self.agent1['host'])
self.plugin.update_routers_states(
self.admin_ctx, {router['id']: n_const.HA_ROUTER_STATE_ACTIVE},
self.admin_ctx, {router['id']: constants.HA_ROUTER_STATE_ACTIVE},
self.agent2['host'])
routers = self.plugin._get_sync_routers(self.admin_ctx,
router_ids=[router['id']])

View File

@ -21,7 +21,6 @@ from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from neutron.common import constants as n_const
from neutron.ipam.drivers.neutrondb_ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import requests as ipam_req
@ -240,7 +239,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
# set ip version to 6 regardless of what's been passed to the
# method
ip_version = constants.IP_VERSION_6
v6_address_mode = n_const.IPV6_SLAAC
v6_address_mode = constants.IPV6_SLAAC
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr,
ip_version=ip_version,

View File

@ -19,7 +19,6 @@ from neutron_lib.db import constants as db_consts
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from neutron.common import constants
from neutron import manager
from neutron.objects.qos import rule_type
from neutron.services.qos import qos_plugin
@ -31,15 +30,15 @@ DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
DRIVER_SUPPORTED_PARAMETERS = [
{
'parameter_name': qos_consts.MAX_KBPS,
'parameter_type': constants.VALUES_TYPE_RANGE,
'parameter_type': lib_consts.VALUES_TYPE_RANGE,
'parameter_values': {"start": 0, "end": db_consts.DB_INTEGER_MAX_VALUE}
}, {
'parameter_name': qos_consts.MAX_BURST,
'parameter_type': constants.VALUES_TYPE_RANGE,
'parameter_type': lib_consts.VALUES_TYPE_RANGE,
'parameter_values': {"start": 0, "end": db_consts.DB_INTEGER_MAX_VALUE}
}, {
'parameter_name': qos_consts.DIRECTION,
'parameter_type': constants.VALUES_TYPE_CHOICES,
'parameter_type': lib_consts.VALUES_TYPE_CHOICES,
'parameter_values': lib_consts.VALID_DIRECTIONS
}
]

View File

@ -19,7 +19,6 @@ from neutron_lib.db import constants as db_const
from neutron_lib.utils import net
from oslo_serialization import jsonutils
from neutron.common import constants
from neutron.objects import common_types
from neutron.tests import base as test_base
from neutron.tests import tools
@ -136,7 +135,7 @@ class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
self.field = common_types.IPNetworkField()
addrs = [
tools.get_random_ip_network(version=ip_version)
for ip_version in constants.IP_ALLOWED_VERSIONS
for ip_version in const.IP_ALLOWED_VERSIONS
]
self.coerce_good_values = [(addr, addr) for addr in addrs]
self.coerce_bad_values = [
@ -160,7 +159,7 @@ class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
super(IPVersionEnumFieldTest, self).setUp()
self.field = common_types.IPVersionEnumField()
self.coerce_good_values = [(val, val)
for val in constants.IP_ALLOWED_VERSIONS]
for val in const.IP_ALLOWED_VERSIONS]
self.coerce_bad_values = [5, 0, -1, 'str']
self.to_primitive_values = self.coerce_good_values
self.from_primitive_values = self.coerce_good_values
@ -207,7 +206,7 @@ class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
super(EtherTypeEnumFieldTest, self).setUp()
self.field = common_types.EtherTypeEnumField()
self.coerce_good_values = [(val, val)
for val in constants.VALID_ETHERTYPES]
for val in const.VALID_ETHERTYPES]
self.coerce_bad_values = ['IpV4', 8, 'str', 'ipv6']
self.to_primitive_values = self.coerce_good_values
self.from_primitive_values = self.coerce_good_values

View File

@ -24,7 +24,6 @@ from oslo_config import cfg
import testtools
from neutron.agent.linux import bridge_lib
from neutron.common import constants as n_const
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.tests import base
@ -542,7 +541,7 @@ class TestCommonAgentLoop(base.BaseTestCase):
def test__process_device_if_exists_no_active_binding_in_host(self):
mock_details = {'device': 'dev123',
n_const.NO_ACTIVE_BINDING: True}
constants.NO_ACTIVE_BINDING: True}
self.agent.mgr = mock.Mock()
self.agent._process_device_if_exists(mock_details)
self.agent.mgr.setup_arp_spoofing_protection.assert_not_called()

View File

@ -20,7 +20,6 @@ from neutron_lib.tests import tools
from neutron_lib.utils import net
from oslo_utils import uuidutils
from neutron.common import constants as n_const
from neutron.db.models import l3 as l3_models
from neutron.objects import l3_hamode
from neutron.objects import network as network_obj
@ -71,8 +70,8 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
with self.ctx.session.begin(subtransactions=True):
network_obj.Network(self.ctx, id=TEST_HA_NETWORK_ID).create()
self._create_router(distributed=distributed, ha=True)
for state, host in [(n_const.HA_ROUTER_STATE_ACTIVE, HOST),
(n_const.HA_ROUTER_STATE_STANDBY, HOST_2)]:
for state, host in [(constants.HA_ROUTER_STATE_ACTIVE, HOST),
(constants.HA_ROUTER_STATE_STANDBY, HOST_2)]:
self._setup_port_binding(
network_id=TEST_HA_NETWORK_ID,
device_owner=constants.DEVICE_OWNER_ROUTER_HA_INTF,
@ -134,7 +133,8 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
l3_hamode.L3HARouterAgentPortBinding(
self.ctx, port_id=port_id, router_id=device_id,
l3_agent_id=agent['id'], state=kwargs.get(
'host_state', n_const.HA_ROUTER_STATE_ACTIVE)).create()
'host_state',
constants.HA_ROUTER_STATE_ACTIVE)).create()
def test_get_distributed_active_network_ports(self):
self._setup_port_binding(

View File

@ -29,7 +29,6 @@ from neutron_lib.plugins import directory
from oslo_serialization import jsonutils
import testtools
from neutron.common import constants as n_const
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import l3_agentschedulers_db
@ -262,7 +261,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
router['id'], interface_info)
self.plugin.update_routers_states(
self.adminContext,
{router['id']: n_const.HA_ROUTER_STATE_ACTIVE}, host)
{router['id']: constants.HA_ROUTER_STATE_ACTIVE}, host)
port = self._get_first_interface(subnet['network_id'], router)

View File

@ -16,12 +16,12 @@
import mock
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.agent.l2 import l2_agent_extensions_manager as l2_ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as c_const
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import exceptions
from neutron.plugins.ml2.drivers.mech_sriov.agent import sriov_nic_agent
@ -547,11 +547,11 @@ class TestSriovAgent(base.BaseTestCase):
rp_bandwidth = {'ens7': {'egress': 10000, 'ingress': 10000}}
agent = sriov_nic_agent.SriovNicSwitchAgent(
{}, {}, 0, rp_bandwidth, {})
self.assertIn(c_const.RP_BANDWIDTHS,
self.assertIn(constants.RP_BANDWIDTHS,
agent.agent_state['configurations'])
rp_bandwidths = agent.agent_state['configurations'][
c_const.RP_BANDWIDTHS]
constants.RP_BANDWIDTHS]
self.assertEqual(rp_bandwidth['ens7'], rp_bandwidths['ens7'])
def test_configurations_has_rp_default_inventory(self):
@ -563,11 +563,11 @@ class TestSriovAgent(base.BaseTestCase):
}
agent = sriov_nic_agent.SriovNicSwitchAgent(
{}, {}, 0, {}, rp_inventory_values)
self.assertIn(c_const.RP_INVENTORY_DEFAULTS,
self.assertIn(constants.RP_INVENTORY_DEFAULTS,
agent.agent_state['configurations'])
rp_inv_defaults = agent.agent_state['configurations'][
c_const.RP_INVENTORY_DEFAULTS]
constants.RP_INVENTORY_DEFAULTS]
self.assertListEqual(
sorted(list(rp_inventory_values)),
sorted(list(rp_inv_defaults.keys())))

View File

@ -17,7 +17,6 @@
import mock
from neutron_lib import constants as const
from neutron.common import constants as n_const
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent.\
openflow.ovs_ofctl import ovs_bridge_test_base
@ -212,19 +211,19 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128']
self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses)
expected = [
call.add_flow(dl_type=n_const.ETHERTYPE_IPV6,
call.add_flow(dl_type=const.ETHERTYPE_IPV6,
actions='resubmit(,60)',
icmp_type=const.ICMPV6_TYPE_NA,
nw_proto=const.PROTO_NUM_IPV6_ICMP,
nd_target='2001:db8::1',
priority=2, table=24, in_port=8888),
call.add_flow(dl_type=n_const.ETHERTYPE_IPV6,
call.add_flow(dl_type=const.ETHERTYPE_IPV6,
actions='resubmit(,60)',
icmp_type=const.ICMPV6_TYPE_NA,
nw_proto=const.PROTO_NUM_IPV6_ICMP,
nd_target='fdf8:f53b:82e4::1/128',
priority=2, table=24, in_port=8888),
call.add_flow(dl_type=n_const.ETHERTYPE_IPV6,
call.add_flow(dl_type=const.ETHERTYPE_IPV6,
icmp_type=const.ICMPV6_TYPE_NA,
nw_proto=const.PROTO_NUM_IPV6_ICMP,
priority=10, table=0, in_port=8888,

View File

@ -30,7 +30,6 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.linux import ip_lib
from neutron.common import constants as c_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
@ -817,7 +816,7 @@ class TestOvsNeutronAgent(object):
def test_treat_devices_added_updated_no_active_binding(self):
details = {'device': 'id',
c_const.NO_ACTIVE_BINDING: True}
n_const.NO_ACTIVE_BINDING: True}
port = mock.Mock()
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
@ -2322,15 +2321,15 @@ class TestOvsNeutronAgent(object):
self.assertFalse(br.install_arp_responder.called)
def test_configurations_has_rp_bandwidth(self):
self.assertIn(c_const.RP_BANDWIDTHS,
self.assertIn(n_const.RP_BANDWIDTHS,
self.agent.agent_state['configurations'])
def test_configurations_has_rp_default_inventory(self):
self.assertIn(c_const.RP_INVENTORY_DEFAULTS,
self.assertIn(n_const.RP_INVENTORY_DEFAULTS,
self.agent.agent_state['configurations'])
rp_inv_defaults = \
self.agent.agent_state['configurations'][
c_const.RP_INVENTORY_DEFAULTS]
n_const.RP_INVENTORY_DEFAULTS]
self.assertListEqual(
sorted(['reserved', 'min_unit', 'allocation_ratio', 'step_size']),
sorted(list(rp_inv_defaults)))
@ -2341,7 +2340,7 @@ class TestOvsNeutronAgent(object):
def test__validate_rp_bandwidth_bridges(self):
cfg.CONF.set_override('bridge_mappings', [], 'OVS')
cfg.CONF.set_override(c_const.RP_BANDWIDTHS,
cfg.CONF.set_override(n_const.RP_BANDWIDTHS,
['no_such_br_in_bridge_mappings:1:1'],
'OVS')
self.assertRaises(ValueError, self._make_agent)

View File

@ -32,7 +32,6 @@ from oslo_context import context as oslo_context
from sqlalchemy.orm import exc
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const
from neutron.db import provisioning_blocks
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2.drivers import type_tunnel
@ -162,7 +161,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
port['device_owner'] = constants.DEVICE_OWNER_COMPUTE_PREFIX
port[portbindings.HOST_ID] = 'other-host'
res = self.callbacks.get_device_details(mock.Mock(), host='host')
self.assertIn(n_const.NO_ACTIVE_BINDING, res)
self.assertIn(constants.NO_ACTIVE_BINDING, res)
def test_get_device_details_qos_policy_id_from_port(self):
port = collections.defaultdict(

View File

@ -18,7 +18,6 @@ from neutron_lib import constants
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.common import constants as n_const
from neutron.objects.logapi import logging_resource as log_object
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
@ -174,7 +173,7 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase):
actions='controller',
cookie=accept_cookie.id,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_TCP,
priority=77,
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
@ -185,7 +184,7 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase):
ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE),
cookie=accept_cookie.id,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IPV6),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IPV6),
priority=70,
reg7=conj_id + 1,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE),
@ -195,7 +194,7 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase):
ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE),
cookie=accept_cookie.id,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_UDP,
priority=77,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
@ -272,7 +271,7 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase):
actions='controller',
cookie=accept_cookie.id,
reg5=self.port_ofport,
dl_type="0x{:04x}".format(n_const.ETHERTYPE_IP),
dl_type="0x{:04x}".format(constants.ETHERTYPE_IP),
nw_proto=constants.PROTO_NUM_TCP,
priority=77,
table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,

View File

@ -19,7 +19,6 @@ from neutron_lib.services.qos import base as qos_driver_base
from neutron_lib.services.qos import constants as qos_consts
from oslo_utils import uuidutils
from neutron.common import constants
from neutron.objects import ports as ports_object
from neutron.objects.qos import rule as rule_object
from neutron.services.qos.drivers import manager as driver_mgr
@ -219,7 +218,7 @@ class TestQosDriversManagerRules(TestQosDriversManagerBase):
self.assertEqual(
expected_parsed_range_parameter, parameter_values)
self.assertEqual(
constants.VALUES_TYPE_RANGE, parameter_type)
lib_consts.VALUES_TYPE_RANGE, parameter_type)
parameter_values, parameter_type = (
driver_mgr.QosServiceDriverManager._parse_parameter_values(
@ -227,7 +226,7 @@ class TestQosDriversManagerRules(TestQosDriversManagerBase):
self.assertEqual(
expected_parsed_values_parameter, parameter_values)
self.assertEqual(
constants.VALUES_TYPE_CHOICES, parameter_type)
lib_consts.VALUES_TYPE_CHOICES, parameter_type)
def test_supported_rule_type_details(self):
driver_manager = self._create_manager_with_drivers({
@ -259,11 +258,11 @@ class TestQosDriversManagerRules(TestQosDriversManagerBase):
'name': 'driver-A',
'supported_parameters': [{
'parameter_name': 'max_kbps',
'parameter_type': constants.VALUES_TYPE_RANGE,
'parameter_type': lib_consts.VALUES_TYPE_RANGE,
'parameter_values': {'start': 0, 'end': 1000}
}, {
'parameter_name': 'max_burst_kbps',
'parameter_type': constants.VALUES_TYPE_RANGE,
'parameter_type': lib_consts.VALUES_TYPE_RANGE,
'parameter_values': {'start': 0, 'end': 1000}
}]
}]

View File

@ -28,7 +28,6 @@ from oslo_config import cfg
from oslo_utils import uuidutils
import webob.exc
from neutron.common import constants
from neutron.extensions import qos_rules_alias
from neutron import manager
from neutron.objects import network as network_object
@ -1099,7 +1098,7 @@ class TestQosPlugin(base.BaseQosTestCase):
'name': 'fake-driver',
'supported_parameters': [{
'parameter_name': 'max_kbps',
'parameter_type': constants.VALUES_TYPE_RANGE,
'parameter_type': lib_constants.VALUES_TYPE_RANGE,
'parameter_range': {'start': 0, 'end': 100}
}]
}]

View File

@ -30,7 +30,6 @@ from oslo_serialization import jsonutils
from oslo_utils import importutils
import neutron
from neutron.common import constants as n_const
from neutron import policy
from neutron.tests import base
@ -327,7 +326,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_advsvc_action_on_attr('get', 'port', 'shared', False)
def test_advsvc_update_port_works(self):
kwargs = {n_const.ATTRIBUTES_TO_UPDATE: ['shared']}
kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_advsvc_action_on_attr('update', 'port', 'shared', True,
**kwargs)
@ -353,11 +352,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self):
kwargs = {n_const.ATTRIBUTES_TO_UPDATE: ['shared']}
kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_enforce_adminonly_attribute('update_network', **kwargs)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {n_const.ATTRIBUTES_TO_UPDATE: ['shared']}
kwargs = {constants.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
oslo_policy.PolicyNotAuthorized,
**kwargs)