Merge "ofagent: Use port desc to monitor ports on br-int"
This commit is contained in:
commit
f8b4eb12af
@ -40,6 +40,7 @@ from neutron import context
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.ofagent.agent import ports
|
||||
from neutron.plugins.ofagent.common import config # noqa
|
||||
from neutron.plugins.openvswitch.common import constants
|
||||
|
||||
@ -301,6 +302,36 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
self._report_state)
|
||||
heartbeat.start(interval=report_interval)
|
||||
|
||||
@staticmethod
|
||||
def _get_ofport_name(interface_id):
|
||||
"""Convert from neutron device id (uuid) to OpenFlow port name.
|
||||
|
||||
This needs to be synced with ML2 plugin's _device_to_port_id().
|
||||
|
||||
An assumption: The switch uses an OS's interface name as the
|
||||
corresponding OpenFlow port name.
|
||||
NOTE(yamamoto): While it's true for Open vSwitch, it isn't
|
||||
necessarily true everywhere. For example, LINC uses something
|
||||
like "LogicalSwitch0-Port2".
|
||||
"""
|
||||
return "tap" + interface_id[0:11]
|
||||
|
||||
def _get_ports(self, br):
|
||||
"""Generate ports.Port instances for the given bridge."""
|
||||
datapath = br.datapath
|
||||
ofpp = datapath.ofproto_parser
|
||||
msg = ofpp.OFPPortDescStatsRequest(datapath=datapath)
|
||||
descs = ryu_api.send_msg(app=self.ryuapp, msg=msg,
|
||||
reply_cls=ofpp.OFPPortDescStatsReply,
|
||||
reply_multi=True)
|
||||
for d in descs:
|
||||
for p in d.body:
|
||||
yield ports.Port.from_ofp_port(p)
|
||||
|
||||
def _get_ofport_names(self, br):
|
||||
"""Return a set of OpenFlow port names for the given bridge."""
|
||||
return set(p.port_name for p in self._get_ports(br))
|
||||
|
||||
def get_net_uuid(self, vif_id):
|
||||
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
|
||||
if vif_id in vlan_mapping.vif_ports:
|
||||
@ -322,7 +353,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
# Even if full port details might be provided to this call,
|
||||
# they are not used since there is no guarantee the notifications
|
||||
# are processed in the same order as the relevant API requests
|
||||
self.updated_ports.add(port['id'])
|
||||
self.updated_ports.add(self._get_ofport_name(port['id']))
|
||||
LOG.debug(_("port_update received port %s"), port['id'])
|
||||
|
||||
def tunnel_update(self, context, **kwargs):
|
||||
@ -600,7 +631,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
self.provision_local_vlan(net_uuid, network_type,
|
||||
physical_network, segmentation_id)
|
||||
lvm = self.local_vlan_map[net_uuid]
|
||||
lvm.vif_ports[port.vif_id] = port
|
||||
lvm.vif_ports[port.port_name] = port
|
||||
# Do not bind a port if it's already bound
|
||||
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
|
||||
if cur_tag != str(lvm.vlan):
|
||||
@ -911,7 +942,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
br, physical_network, bridge, ip_wrapper)
|
||||
|
||||
def scan_ports(self, registered_ports, updated_ports=None):
|
||||
cur_ports = self.int_br.get_vif_port_set()
|
||||
cur_ports = self._get_ofport_names(self.int_br)
|
||||
self.int_br_device_count = len(cur_ports)
|
||||
port_info = {'current': cur_ports}
|
||||
if updated_ports is None:
|
||||
@ -941,25 +972,32 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
The returned value is a set of port ids of the ports concerned by a
|
||||
vlan tag loss.
|
||||
"""
|
||||
# TODO(yamamoto): stop using ovsdb
|
||||
# an idea is to use metadata instead of tagged vlans.
|
||||
# cf. blueprint ofagent-merge-bridges
|
||||
port_tags = self.int_br.get_port_tag_dict()
|
||||
changed_ports = set()
|
||||
for lvm in self.local_vlan_map.values():
|
||||
for port in registered_ports:
|
||||
if (
|
||||
port in lvm.vif_ports
|
||||
and lvm.vif_ports[port].port_name in port_tags
|
||||
and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan
|
||||
and port in port_tags
|
||||
and port_tags[port] != lvm.vlan
|
||||
):
|
||||
LOG.info(
|
||||
_("Port '%(port_name)s' has lost "
|
||||
"its vlan tag '%(vlan_tag)d'!"),
|
||||
{'port_name': lvm.vif_ports[port].port_name,
|
||||
{'port_name': port,
|
||||
'vlan_tag': lvm.vlan}
|
||||
)
|
||||
changed_ports.add(port)
|
||||
return changed_ports
|
||||
|
||||
def update_ancillary_ports(self, registered_ports):
|
||||
# TODO(yamamoto): stop using ovsdb
|
||||
# - do the same as scan_ports
|
||||
# - or, find a way to update status of ancillary ports differently
|
||||
# eg. let interface drivers mark ports up
|
||||
ports = set()
|
||||
for bridge in self.ancillary_brs:
|
||||
ports |= bridge.get_vif_port_set()
|
||||
@ -981,7 +1019,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
# error condition of which operators should be aware
|
||||
if not vif_port.ofport:
|
||||
LOG.warn(_("VIF port: %s has no ofport configured, and might "
|
||||
"not be able to transmit"), vif_port.vif_id)
|
||||
"not be able to transmit"), vif_port.port_name)
|
||||
if admin_state_up:
|
||||
self.port_bound(vif_port, network_id, network_type,
|
||||
physical_network, segmentation_id)
|
||||
@ -1051,16 +1089,17 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
|
||||
def treat_devices_added_or_updated(self, devices):
|
||||
resync = False
|
||||
all_ports = dict((p.port_name, p) for p in self._get_ports())
|
||||
for device in devices:
|
||||
LOG.debug(_("Processing port %s"), device)
|
||||
port = self.int_br.get_vif_port_by_id(device)
|
||||
if not port:
|
||||
if device not in all_ports:
|
||||
# The port has disappeared and should not be processed
|
||||
# There is no need to put the port DOWN in the plugin as
|
||||
# it never went up in the first place
|
||||
LOG.info(_("Port %s was not found on the integration bridge "
|
||||
"and will therefore not be processed"), device)
|
||||
continue
|
||||
port = all_ports[device]
|
||||
try:
|
||||
details = self.plugin_rpc.get_device_details(self.context,
|
||||
device,
|
||||
@ -1369,6 +1408,9 @@ class OFANeutronAgent(n_rpc.RpcCallback,
|
||||
self.iter_num = self.iter_num + 1
|
||||
|
||||
def daemon_loop(self):
|
||||
# TODO(yamamoto): make polling logic stop using ovsdb monitor
|
||||
# - make it a dumb periodic polling
|
||||
# - or, monitor port status async messages
|
||||
with polling.get_polling_manager(
|
||||
self.minimize_polling,
|
||||
self.root_helper,
|
||||
|
27
neutron/plugins/ofagent/agent/ports.py
Normal file
27
neutron/plugins/ofagent/agent/ports.py
Normal file
@ -0,0 +1,27 @@
|
||||
# Copyright (C) 2014 VA Linux Systems Japan K.K.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
|
||||
|
||||
|
||||
class Port(object):
|
||||
def __init__(self, port_name, ofport):
|
||||
self.port_name = port_name
|
||||
self.ofport = ofport
|
||||
|
||||
@classmethod
|
||||
def from_ofp_port(cls, ofp_port):
|
||||
"""Convert from ryu OFPPort."""
|
||||
return cls(port_name=ofp_port.name, ofport=ofp_port.port_no)
|
@ -62,7 +62,7 @@ def _mkcls(name):
|
||||
self._hist = []
|
||||
|
||||
def __getattr__(self, name):
|
||||
return name
|
||||
return self._kwargs[name]
|
||||
|
||||
def __repr__(self):
|
||||
args = map(repr, self._args)
|
||||
@ -74,6 +74,8 @@ def _mkcls(name):
|
||||
|
||||
|
||||
class _Mod(object):
|
||||
_cls_cache = {}
|
||||
|
||||
def __init__(self, name):
|
||||
self._name = name
|
||||
|
||||
@ -81,7 +83,13 @@ class _Mod(object):
|
||||
fullname = '%s.%s' % (self._name, name)
|
||||
if '_' in name: # constants are named like OFPxxx_yyy_zzz
|
||||
return _SimpleValue(fullname)
|
||||
return _mkcls(fullname)
|
||||
try:
|
||||
return self._cls_cache[fullname]
|
||||
except KeyError:
|
||||
pass
|
||||
cls = _mkcls(fullname)
|
||||
self._cls_cache[fullname] = cls
|
||||
return cls
|
||||
|
||||
def __repr__(self):
|
||||
return 'Mod(%s)' % (self._name,)
|
||||
|
@ -26,7 +26,6 @@ from oslo.config import cfg
|
||||
import testtools
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ovs_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.plugins.common import constants as p_const
|
||||
@ -333,12 +332,12 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
def test_port_dead_with_port_already_dead(self):
|
||||
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
|
||||
|
||||
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
|
||||
def mock_scan_ports(self, port_set=None, registered_ports=None,
|
||||
updated_ports=None, port_tags_dict=None):
|
||||
port_tags_dict = port_tags_dict or {}
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_set',
|
||||
return_value=vif_port_set),
|
||||
mock.patch.object(self.agent, '_get_ofport_names',
|
||||
return_value=port_set),
|
||||
mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
|
||||
return_value=port_tags_dict)
|
||||
):
|
||||
@ -395,31 +394,33 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_update_ports_returns_lost_vlan_port(self):
|
||||
br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp)
|
||||
mac = "ca:fe:de:ad:be:ef"
|
||||
port = ovs_lib.VifPort(1, 1, 1, mac, br)
|
||||
port = mock.Mock(port_name='tap00000001-00', ofport=1)
|
||||
lvm = self.mod_agent.LocalVLANMapping(
|
||||
1, '1', None, 1, {port.vif_id: port})
|
||||
vlan=1, network_type='1', physical_network=None, segmentation_id=1,
|
||||
vif_ports={port.port_name: port})
|
||||
local_vlan_map = {'1': lvm}
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
port_tags_dict = {1: []}
|
||||
port_set = set(['tap00000001-00',
|
||||
'tap00000003-00'])
|
||||
registered_ports = set(['tap00000001-00', 'tap00000002-00'])
|
||||
port_tags_dict = {'tap00000001-00': []}
|
||||
expected = dict(
|
||||
added=set([3]), current=vif_port_set,
|
||||
removed=set([2]), updated=set([1])
|
||||
added=set(['tap00000003-00']),
|
||||
current=set(['tap00000001-00', 'tap00000003-00']),
|
||||
removed=set(['tap00000002-00']),
|
||||
updated=set(['tap00000001-00'])
|
||||
)
|
||||
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map):
|
||||
actual = self.mock_scan_ports(
|
||||
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
|
||||
port_set, registered_ports, port_tags_dict=port_tags_dict)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
side_effect=Exception()),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.Mock())):
|
||||
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
|
||||
mock.patch.object(self.agent, '_get_ports',
|
||||
return_value=[mock.Mock(port_name='xxx')])):
|
||||
self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx']))
|
||||
|
||||
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
||||
"""Mock treat devices added or updated.
|
||||
@ -432,13 +433,14 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
return_value=details),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=port),
|
||||
mock.patch.object(self.agent, '_get_ports',
|
||||
return_value=[port]),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, func_name)
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated(
|
||||
[port.port_name]))
|
||||
return func.called
|
||||
|
||||
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
||||
@ -478,14 +480,15 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
return_value=fake_details_dict),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.MagicMock()),
|
||||
mock.patch.object(self.agent, '_get_ports',
|
||||
return_value=[mock.Mock(port_name='xxx')]),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, 'treat_vif_port')
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||
upd_dev_down, treat_vif_port):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated(
|
||||
['xxx']))
|
||||
self.assertTrue(treat_vif_port.called)
|
||||
self.assertTrue(upd_dev_down.called)
|
||||
|
||||
@ -561,7 +564,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
recl_fn.assert_called_with("123")
|
||||
|
||||
def test_port_update(self):
|
||||
port = {"id": "123",
|
||||
port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
|
||||
"network_id": "124",
|
||||
"admin_state_up": False}
|
||||
self.agent.port_update("unused_context",
|
||||
@ -569,7 +572,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
network_type="vlan",
|
||||
segmentation_id="1",
|
||||
physical_network="physnet")
|
||||
self.assertEqual(set(['123']), self.agent.updated_ports)
|
||||
self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports)
|
||||
|
||||
def test_setup_physical_bridges(self):
|
||||
with contextlib.nested(
|
||||
@ -966,6 +969,32 @@ class TestOFANeutronAgent(OFAAgentTestCase):
|
||||
table_id=ofp.OFPTT_ALL)
|
||||
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
||||
|
||||
def test__get_ports(self):
|
||||
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
||||
reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge',
|
||||
port_no=8)])]
|
||||
sendmsg = mock.Mock(return_value=reply)
|
||||
self.mod_agent.ryu_api.send_msg = sendmsg
|
||||
result = self.agent._get_ports(self.agent.int_br)
|
||||
result = list(result) # convert generator to list.
|
||||
self.assertEqual(1, len(result))
|
||||
self.assertEqual('hoge', result[0].port_name)
|
||||
self.assertEqual(8, result[0].ofport)
|
||||
expected_msg = ofpp.OFPPortDescStatsRequest(
|
||||
datapath=self.agent.int_br.datapath)
|
||||
sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp,
|
||||
msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply,
|
||||
reply_multi=True)])
|
||||
|
||||
def test__get_ofport_names(self):
|
||||
names = ['p111', 'p222', 'p333']
|
||||
ps = [mock.Mock(port_name=x, ofport=names.index(x)) for x in names]
|
||||
with mock.patch.object(self.agent, '_get_ports',
|
||||
return_value=ps) as _get_ports:
|
||||
result = self.agent._get_ofport_names('hoge')
|
||||
_get_ports.assert_called_once_with('hoge')
|
||||
self.assertEqual(set(names), result)
|
||||
|
||||
|
||||
class AncillaryBridgesTest(OFAAgentTestCase):
|
||||
|
||||
|
32
neutron/tests/unit/ofagent/test_ofa_ports.py
Normal file
32
neutron/tests/unit/ofagent/test_ofa_ports.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Copyright (C) 2014 VA Linux Systems Japan K.K.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
|
||||
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.plugins.ofagent.agent import ports
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestOFAgentPorts(base.BaseTestCase):
|
||||
def test_port(self):
|
||||
p1 = ports.Port(port_name='foo', ofport=999)
|
||||
ryu_ofp_port = mock.Mock(port_no=999)
|
||||
ryu_ofp_port.name = 'foo'
|
||||
p2 = ports.Port.from_ofp_port(ofp_port=ryu_ofp_port)
|
||||
self.assertEqual(p1.port_name, p2.port_name)
|
||||
self.assertEqual(p1.ofport, p2.ofport)
|
Loading…
Reference in New Issue
Block a user