Replaces network:* strings by constants

This patch replaces all occurences of the strings
prefixed by network:* by their constant equivalent.

Closes-bug: #1270863
Change-Id: I149cc0ab7bde08ea83057e6c0697f668edbe29db
This commit is contained in:
Sylvain Afchain 2014-01-17 06:40:43 +01:00 committed by Akihiro Motoki
parent ac8c0c645d
commit fbf0e62139
21 changed files with 69 additions and 55 deletions

View File

@ -520,7 +520,7 @@ class Dnsmasq(DhcpLocalProcess):
# provides all dnsmasq ip as dns-server if there is more than
# one dnsmasq for a subnet and there is no dns-server submitted
# by the server
if port.device_owner == 'network:dhcp':
if port.device_owner == constants.DEVICE_OWNER_DHCP:
for ip in port.fixed_ips:
i = subnet_idx_map.get(ip.subnet_id)
if i is None:

View File

@ -43,8 +43,6 @@ from neutron import wsgi
LOG = logging.getLogger(__name__)
DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
class MetadataProxyHandler(object):
OPTS = [
@ -133,7 +131,7 @@ class MetadataProxyHandler(object):
else:
internal_ports = qclient.list_ports(
device_id=router_id,
device_owner=DEVICE_OWNER_ROUTER_INTF)['ports']
device_owner=n_const.DEVICE_OWNER_ROUTER_INTF)['ports']
networks = [p['network_id'] for p in internal_ports]

View File

@ -38,8 +38,6 @@ from neutron.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
AGENT_OWNER_PREFIX = 'network:'
# Ports with the following 'device_owner' values will not prevent
# network deletion. If delete_network() finds that all ports on a
# network have these owners, it will explicitly delete each port
@ -47,7 +45,7 @@ AGENT_OWNER_PREFIX = 'network:'
# finds out that all existing IP Allocations are associated with ports
# with these owners, it will allow subnet deletion to proceed with the
# IP allocations being cleaned up by cascade.
AUTO_DELETE_PORT_OWNERS = ['network:dhcp']
AUTO_DELETE_PORT_OWNERS = [constants.DEVICE_OWNER_DHCP]
class CommonDbMixin(object):

View File

@ -191,7 +191,7 @@ class DhcpRpcCallbackMixin(object):
tenant_id=network['tenant_id'],
mac_address=attributes.ATTR_NOT_SPECIFIED,
name='',
device_owner='network:dhcp',
device_owner=constants.DEVICE_OWNER_DHCP,
fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
retval = self._port_action(plugin, context, {'port': port_dict},

View File

@ -221,7 +221,7 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
interfaces = []
mapped_router = self._map_state_and_status(router)
router_filter = {
'device_owner': ["network:router_interface"],
'device_owner': [const.DEVICE_OWNER_ROUTER_INTF],
'device_id': [router.get('id')]
}
router_ports = self.get_ports(admin_context,
@ -642,7 +642,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
net = super(NeutronRestProxyV2,
self).get_network(context, new_port["network_id"])
if self.add_meta_server_route:
if new_port['device_owner'] == 'network:dhcp':
if new_port['device_owner'] == const.DEVICE_OWNER_DHCP:
destination = METADATA_SERVER_IP + '/32'
self._add_host_route(context, destination, new_port)

View File

@ -24,6 +24,7 @@ import eventlet
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
@ -1180,7 +1181,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
:returns: port object
"""
if ('device_id' in port['port'] and port['port']['device_owner'] in
['network:dhcp', 'network:router_interface']):
[constants.DEVICE_OWNER_DHCP, constants.DEVICE_OWNER_ROUTER_INTF]):
p_profile_name = c_conf.CISCO_N1K.network_node_policy_profile
p_profile = self._get_policy_profile_by_name(p_profile_name)
if p_profile:

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info
from neutron.common import constants
from neutron.db import models_v2
from neutron.openstack.common import log as logging
@ -47,7 +48,8 @@ def retrieve_ip_allocation_info(context, neutron_port):
return
subnet = retrieve_subnet(context, subnet_id)
allocated_ip = neutron_port["fixed_ips"][0]["ip_address"]
is_gw_port = neutron_port["device_owner"] == "network:router_gateway"
is_gw_port = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
gateway_ip = subnet["gateway_ip"]
ip_allocation_info = h_info.IpAllocationInfo(

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info
from neutron.common import constants
from neutron import manager
from neutron.plugins.embrane.l2base import support_base as base
@ -32,7 +33,8 @@ class FakePluginSupport(base.SupportBase):
plugin = manager.NeutronManager.get_plugin()
network_id = neutron_port["network_id"]
network = plugin._get_network(context, network_id)
is_gw = neutron_port["device_owner"] == "network:router_gateway"
is_gw = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
result = h_info.UtifInfo(vlan=0,
network_name=network["name"],
network_id=network["id"],

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info
from neutron.common import constants
from neutron import manager
from neutron.plugins.embrane.l2base import support_base as base
from neutron.plugins.embrane.l2base import support_exceptions as exc
@ -45,7 +46,8 @@ class OpenvswitchSupport(base.SupportBase):
err_msg=_("No segmentation_id found for the network, "
"please be sure that tenant_network_type is vlan"))
network = plugin._get_network(context, network_id)
is_gw = neutron_port["device_owner"] == "network:router_gateway"
is_gw = (neutron_port["device_owner"] ==
constants.DEVICE_OWNER_ROUTER_GW)
result = h_info.UtifInfo(vlan=network_binding["segmentation_id"],
network_name=network["name"],
network_id=network["id"],

View File

@ -163,7 +163,7 @@ def _is_vif_port(port):
def _is_dhcp_port(port):
"""Check whether the given port is a DHCP port."""
device_owner = port['device_owner']
return device_owner.startswith('network:dhcp')
return device_owner.startswith(constants.DEVICE_OWNER_DHCP)
def _check_resource_exists(func, id, name, raise_exc=False):
@ -1024,7 +1024,7 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
rport_qry = context.session.query(models_v2.Port)
dhcp_ports = rport_qry.filter_by(
network_id=subnet["network_id"],
device_owner='network:dhcp').all()
device_owner=constants.DEVICE_OWNER_DHCP).all()
if dhcp_ports and dhcp_ports[0].fixed_ips:
metadata_gw_ip = dhcp_ports[0].fixed_ips[0].ip_address
else:

View File

@ -25,6 +25,7 @@ import netaddr
from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import l3_db
@ -184,7 +185,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
port_db = super(NeutronPluginPLUMgridV2, self).create_port(context,
port)
device_id = port_db["device_id"]
if port_db["device_owner"] == "network:router_gateway":
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
router_db = self._get_router(context, device_id)
else:
router_db = None
@ -212,7 +213,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
port_db = super(NeutronPluginPLUMgridV2, self).update_port(
context, port_id, port)
device_id = port_db["device_id"]
if port_db["device_owner"] == "network:router_gateway":
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
router_db = self._get_router(context, device_id)
else:
router_db = None
@ -242,7 +243,7 @@ class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
self.disassociate_floatingips(context, port_id)
super(NeutronPluginPLUMgridV2, self).delete_port(context, port_id)
if port_db["device_owner"] == "network:router_gateway":
if port_db["device_owner"] == constants.DEVICE_OWNER_ROUTER_GW:
device_id = port_db["device_id"]
router_db = self._get_router(context, device_id)
else:

View File

@ -1028,7 +1028,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Before deleting ports, ensure the peer of a NSX logical
# port with a patch attachment is removed too
port_filter = {'network_id': [id],
'device_owner': ['network:router_interface']}
'device_owner': [constants.DEVICE_OWNER_ROUTER_INTF]}
router_iface_ports = self.get_ports(context, filters=port_filter)
for port in router_iface_ports:
nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id(

View File

@ -585,11 +585,12 @@ class TestNecPluginOfcManager(NecPluginV2TestCase):
with self.network() as network:
with self.subnet(network=network):
net = network['network']
p = self._create_resource('port',
{'network_id': net['id'],
'tenant_id': net['tenant_id'],
'device_owner': 'network:dhcp',
'device_id': 'dhcp-port1'})
p = self._create_resource(
'port',
{'network_id': net['id'],
'tenant_id': net['tenant_id'],
'device_owner': constants.DEVICE_OWNER_DHCP,
'device_id': 'dhcp-port1'})
# Make sure that the port is created on OFC.
portinfo = {'id': p['id'], 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])

View File

@ -31,6 +31,7 @@ from neutron.api.v2 import attributes
from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED
from neutron.api.v2.router import APIRouter
from neutron.common import config
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common.test_lib import test_config
from neutron import context
@ -1132,7 +1133,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id, device_owner='network:dhcp')
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
@ -2412,7 +2414,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
cidr, ip_version=4)
self._create_port(self.fmt,
network['network']['id'],
device_owner='network:dhcp')
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)

View File

@ -15,6 +15,7 @@
import mock
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.db import dhcp_rpc_base
from neutron.openstack.common.db import exception as db_exc
@ -54,7 +55,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
def _test__port_action_with_failures(self, exc=None, action=None):
port = {
'network_id': 'foo_network_id',
'device_owner': 'network:dhcp',
'device_owner': constants.DEVICE_OWNER_DHCP,
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
}
self.plugin.create_port.side_effect = exc
@ -187,7 +188,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
create_spec = dict(tenant_id='tenantid', device_id='devid',
network_id='netid', name='',
admin_state_up=True,
device_owner='network:dhcp',
device_owner=constants.DEVICE_OWNER_DHCP,
mac_address=mock.ANY)
create_retval = create_spec.copy()
create_retval['id'] = 'port_id'

View File

@ -19,6 +19,7 @@ import contextlib
from oslo.config import cfg
from webob import exc
from neutron.common import constants
from neutron.db import extraroute_db
from neutron.extensions import extraroute
from neutron.extensions import l3
@ -392,7 +393,6 @@ class ExtraRouteDBTestCaseBase(object):
p['port']['id'])
def test_router_update_on_external_port(self):
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
self._set_net_external(s['subnet']['network_id'])
@ -402,11 +402,12 @@ class ExtraRouteDBTestCaseBase(object):
body = self._show('routers', r['router']['id'])
net_id = body['router']['external_gateway_info']['network_id']
self.assertEqual(net_id, s['subnet']['network_id'])
port_res = self._list_ports('json',
200,
s['subnet']['network_id'],
tenant_id=r['router']['tenant_id'],
device_own=DEVICE_OWNER_ROUTER_GW)
port_res = self._list_ports(
'json',
200,
s['subnet']['network_id'],
tenant_id=r['router']['tenant_id'],
device_own=constants.DEVICE_OWNER_ROUTER_GW)
port_list = self.deserialize('json', port_res)
self.assertEqual(len(port_list['ports']), 1)

View File

@ -1146,7 +1146,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, 400)
for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip':
if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
self.fail('garbage port is not deleted')
self._remove_external_gateway_from_router(
r['router']['id'],
@ -1184,7 +1185,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.assertEqual(res.status_int, exc.HTTPConflict.code)
for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip':
if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
self.fail('garbage port is not deleted')
self._remove_external_gateway_from_router(
@ -1341,7 +1343,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False
with self.floatingip_with_assoc():
for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:floatingip':
if p['device_owner'] == l3_constants.DEVICE_OWNER_FLOATINGIP:
self._delete('ports', p['id'],
expected_code=exc.HTTPConflict.code)
found = True
@ -1524,7 +1526,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False
with self.floatingip_with_assoc():
for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:router_interface':
if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
subnet_id = p['fixed_ips'][0]['subnet_id']
router_id = p['device_id']
self._router_interface_action(
@ -1538,7 +1540,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
found = False
with self.floatingip_with_assoc():
for p in self._list('ports')['ports']:
if p['device_owner'] == 'network:router_interface':
if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
router_id = p['device_id']
self._router_interface_action(
'remove', router_id, None, p['id'],

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent.linux import dhcp
from neutron.common import config as base_config
from neutron.common import constants
from neutron.openstack.common import log as logging
from neutron.tests import base
@ -80,7 +81,7 @@ class FakePort3:
class FakeRouterPort:
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
admin_state_up = True
device_owner = 'network:router_interface'
device_owner = constants.DEVICE_OWNER_ROUTER_INTF
fixed_ips = [FakeIPAllocation('192.168.0.1',
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:rr:rr:rr'
@ -92,7 +93,7 @@ class FakeRouterPort:
class FakePortMultipleAgents1:
id = 'rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr'
admin_state_up = True
device_owner = 'network:dhcp'
device_owner = constants.DEVICE_OWNER_DHCP
fixed_ips = [FakeIPAllocation('192.168.0.5',
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:dd:dd:dd'
@ -104,7 +105,7 @@ class FakePortMultipleAgents1:
class FakePortMultipleAgents2:
id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
admin_state_up = True
device_owner = 'network:dhcp'
device_owner = constants.DEVICE_OWNER_DHCP
fixed_ips = [FakeIPAllocation('192.168.0.6',
'dddddddd-dddd-dddd-dddd-dddddddddddd')]
mac_address = '00:00:0f:ee:ee:ee'

View File

@ -23,6 +23,7 @@ import testtools
import webob
from neutron.agent.metadata import agent
from neutron.common import constants
from neutron.common import utils
from neutron.tests import base
@ -112,7 +113,7 @@ class TestMetadataProxyHandler(base.BaseTestCase):
expected.append(
mock.call().list_ports(
device_id=router_id,
device_owner='network:router_interface'
device_owner=constants.DEVICE_OWNER_ROUTER_INTF
)
)

View File

@ -17,6 +17,7 @@ import mock
from oslo.config import cfg
from neutron.common import constants as n_consts
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import api as db
@ -940,12 +941,12 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
def test_notify_ports_update_with_special_ports(self):
ports = [{'fixed_ips': [],
'device_id': '',
'device_owner': 'network:dhcp',
'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': 'fa:16:3e:da:1d:46'},
{'fixed_ips': [{'subnet_id': 'foo_subnet_id',
'ip_address': '1.2.3.4'}],
'device_id': 'foo_device_id',
'device_owner': 'network:router_gateway',
'device_owner': n_consts.DEVICE_OWNER_ROUTER_GW,
'mac_address': 'fa:16:3e:da:1d:46'}]
call_args = mock.call(
mock.ANY, 'foo_network_id', 'foo_subnet_id', dhcp=[], meta=[])
@ -1014,7 +1015,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
'admin_state_up': True,
'network_id': 'foo_network_id',
'tenant_id': 'foo_tenant_id',
'device_owner': 'network:dhcp',
'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': mock.ANY,
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}],
'device_id': ''
@ -1102,7 +1103,7 @@ class DhcpAgentNotifyAPITestCase(base.BaseTestCase):
self.notifier.notify(mock.ANY, {'subnet': subnet}, 'subnet.delete.end')
filters = {
'network_id': [subnet['network_id']],
'device_owner': ['network:dhcp']
'device_owner': [n_consts.DEVICE_OWNER_DHCP]
}
self.plugin.get_ports.assert_called_once_with(
mock.ANY, filters=filters)
@ -1149,7 +1150,7 @@ class DhcpTestCase(base.BaseTestCase):
}
port = {
'id': 'foo_port_id',
'device_owner': 'network:dhcp',
'device_owner': n_consts.DEVICE_OWNER_DHCP,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'network_id': 'foo_network_id',
'fixed_ips': [{'subnet_id': subnet['id']}]
@ -1184,7 +1185,7 @@ class DhcpTestCase(base.BaseTestCase):
def test_handle_delete_dhcp_owner_port(self):
port = {
'id': 'foo_port_id',
'device_owner': 'network:dhcp',
'device_owner': n_consts.DEVICE_OWNER_DHCP,
'network_id': 'foo_network_id',
'fixed_ips': [],
'mac_address': 'aa:bb:cc:dd:ee:ff'
@ -1297,15 +1298,15 @@ class MetadataTestCase(base.BaseTestCase):
def test_handle_port_metadata_access_dhcp_port(self):
self._test_handle_port_metadata_access_special_owners(
'network:dhcp', [{'subnet_id': 'foo_subnet'}])
n_consts.DEVICE_OWNER_DHCP, [{'subnet_id': 'foo_subnet'}])
def test_handle_port_metadata_access_router_port(self):
self._test_handle_port_metadata_access_special_owners(
'network:router_interface', [{'subnet_id': 'foo_subnet'}])
n_consts.DEVICE_OWNER_ROUTER_INTF, [{'subnet_id': 'foo_subnet'}])
def test_handle_port_metadata_access_no_device_id(self):
self._test_handle_port_metadata_access_special_owners(
'network:dhcp', '')
n_consts.DEVICE_OWNER_DHCP, '')
def test_handle_port_metadata_access_no_fixed_ips(self):
self._test_handle_port_metadata_access_special_owners(

View File

@ -221,7 +221,7 @@ class TestPortsV2(NsxPluginV2TestCase,
with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping',
side_effect=db_exception):
with self.network() as net:
with self.port(device_owner='network:dhcp'):
with self.port(device_owner=constants.DEVICE_OWNER_DHCP):
self._verify_no_orphan_left(net['network']['id'])
def test_create_port_maintenance_returns_503(self):
@ -909,7 +909,7 @@ class TestL3NatTestCase(L3NatTest,
subnets = self._list('subnets')['subnets']
with self.subnet() as s:
with self.port(subnet=s, device_id='1234',
device_owner='network:dhcp'):
device_owner=constants.DEVICE_OWNER_DHCP):
subnets = self._list('subnets')['subnets']
self.assertEqual(len(subnets), 1)
self.assertEqual(subnets[0]['host_routes'][0]['nexthop'],