ofagent: Use port desc to monitor ports on br-int

also, use ofport.name as device identifiers.
sprinkle some TODO comments.

Implements: blueprint ofagent-port-monitor
Change-Id: I4995964f37729f954fec71c4a2e61e463a430b1a
This commit is contained in:
YAMAMOTO Takashi 2014-06-03 15:53:22 +09:00
parent d379170109
commit 9d13ea884b
5 changed files with 174 additions and 36 deletions

View File

@ -41,6 +41,7 @@ from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.ofagent.agent import ports
from neutron.plugins.ofagent.common import config # noqa
from neutron.plugins.openvswitch.common import constants
@ -302,6 +303,36 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
self._report_state)
heartbeat.start(interval=report_interval)
@staticmethod
def _get_ofport_name(interface_id):
"""Convert from neutron device id (uuid) to OpenFlow port name.
This needs to be synced with ML2 plugin's _device_to_port_id().
An assumption: The switch uses an OS's interface name as the
corresponding OpenFlow port name.
NOTE(yamamoto): While it's true for Open vSwitch, it isn't
necessarily true everywhere. For example, LINC uses something
like "LogicalSwitch0-Port2".
"""
return "tap" + interface_id[0:11]
def _get_ports(self, br):
"""Generate ports.Port instances for the given bridge."""
datapath = br.datapath
ofpp = datapath.ofproto_parser
msg = ofpp.OFPPortDescStatsRequest(datapath=datapath)
descs = ryu_api.send_msg(app=self.ryuapp, msg=msg,
reply_cls=ofpp.OFPPortDescStatsReply,
reply_multi=True)
for d in descs:
for p in d.body:
yield ports.Port.from_ofp_port(p)
def _get_ofport_names(self, br):
"""Return a set of OpenFlow port names for the given bridge."""
return set(p.port_name for p in self._get_ports(br))
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
if vif_id in vlan_mapping.vif_ports:
@ -323,7 +354,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
self.updated_ports.add(self._get_ofport_name(port['id']))
LOG.debug(_("port_update received port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
@ -609,7 +640,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
lvm.vif_ports[port.vif_id] = port
lvm.vif_ports[port.port_name] = port
# Do not bind a port if it's already bound
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
if cur_tag != str(lvm.vlan):
@ -920,7 +951,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
br, physical_network, bridge, ip_wrapper)
def scan_ports(self, registered_ports, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
cur_ports = self._get_ofport_names(self.int_br)
self.int_br_device_count = len(cur_ports)
port_info = {'current': cur_ports}
if updated_ports is None:
@ -950,25 +981,32 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
The returned value is a set of port ids of the ports concerned by a
vlan tag loss.
"""
# TODO(yamamoto): stop using ovsdb
# an idea is to use metadata instead of tagged vlans.
# cf. blueprint ofagent-merge-bridges
port_tags = self.int_br.get_port_tag_dict()
changed_ports = set()
for lvm in self.local_vlan_map.values():
for port in registered_ports:
if (
port in lvm.vif_ports
and lvm.vif_ports[port].port_name in port_tags
and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan
and port in port_tags
and port_tags[port] != lvm.vlan
):
LOG.info(
_("Port '%(port_name)s' has lost "
"its vlan tag '%(vlan_tag)d'!"),
{'port_name': lvm.vif_ports[port].port_name,
{'port_name': port,
'vlan_tag': lvm.vlan}
)
changed_ports.add(port)
return changed_ports
def update_ancillary_ports(self, registered_ports):
# TODO(yamamoto): stop using ovsdb
# - do the same as scan_ports
# - or, find a way to update status of ancillary ports differently
# eg. let interface drivers mark ports up
ports = set()
for bridge in self.ancillary_brs:
ports |= bridge.get_vif_port_set()
@ -990,7 +1028,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
# error condition of which operators should be aware
if not vif_port.ofport:
LOG.warn(_("VIF port: %s has no ofport configured, and might "
"not be able to transmit"), vif_port.vif_id)
"not be able to transmit"), vif_port.port_name)
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
physical_network, segmentation_id)
@ -1060,16 +1098,17 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
def treat_devices_added_or_updated(self, devices):
resync = False
all_ports = dict((p.port_name, p) for p in self._get_ports())
for device in devices:
LOG.debug(_("Processing port %s"), device)
port = self.int_br.get_vif_port_by_id(device)
if not port:
if device not in all_ports:
# The port has disappeared and should not be processed
# There is no need to put the port DOWN in the plugin as
# it never went up in the first place
LOG.info(_("Port %s was not found on the integration bridge "
"and will therefore not be processed"), device)
continue
port = all_ports[device]
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
@ -1378,6 +1417,9 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
self.iter_num = self.iter_num + 1
def daemon_loop(self):
# TODO(yamamoto): make polling logic stop using ovsdb monitor
# - make it a dumb periodic polling
# - or, monitor port status async messages
with polling.get_polling_manager(
self.minimize_polling,
self.root_helper,

View File

@ -0,0 +1,27 @@
# Copyright (C) 2014 VA Linux Systems Japan K.K.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
class Port(object):
def __init__(self, port_name, ofport):
self.port_name = port_name
self.ofport = ofport
@classmethod
def from_ofp_port(cls, ofp_port):
"""Convert from ryu OFPPort."""
return cls(port_name=ofp_port.name, ofport=ofp_port.port_no)

View File

@ -62,7 +62,7 @@ def _mkcls(name):
self._hist = []
def __getattr__(self, name):
return name
return self._kwargs[name]
def __repr__(self):
args = map(repr, self._args)
@ -74,6 +74,8 @@ def _mkcls(name):
class _Mod(object):
_cls_cache = {}
def __init__(self, name):
self._name = name
@ -81,7 +83,13 @@ class _Mod(object):
fullname = '%s.%s' % (self._name, name)
if '_' in name: # constants are named like OFPxxx_yyy_zzz
return _SimpleValue(fullname)
return _mkcls(fullname)
try:
return self._cls_cache[fullname]
except KeyError:
pass
cls = _mkcls(fullname)
self._cls_cache[fullname] = cls
return cls
def __repr__(self):
return 'Mod(%s)' % (self._name,)

View File

@ -26,7 +26,6 @@ from oslo.config import cfg
import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.openstack.common import importutils
from neutron.plugins.common import constants as p_const
@ -333,12 +332,12 @@ class TestOFANeutronAgent(OFAAgentTestCase):
def test_port_dead_with_port_already_dead(self):
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
def mock_scan_ports(self, port_set=None, registered_ports=None,
updated_ports=None, port_tags_dict=None):
port_tags_dict = port_tags_dict or {}
with contextlib.nested(
mock.patch.object(self.agent.int_br, 'get_vif_port_set',
return_value=vif_port_set),
mock.patch.object(self.agent, '_get_ofport_names',
return_value=port_set),
mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
return_value=port_tags_dict)
):
@ -395,31 +394,33 @@ class TestOFANeutronAgent(OFAAgentTestCase):
self.assertEqual(expected, actual)
def test_update_ports_returns_lost_vlan_port(self):
br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp)
mac = "ca:fe:de:ad:be:ef"
port = ovs_lib.VifPort(1, 1, 1, mac, br)
port = mock.Mock(port_name='tap00000001-00', ofport=1)
lvm = self.mod_agent.LocalVLANMapping(
1, '1', None, 1, {port.vif_id: port})
vlan=1, network_type='1', physical_network=None, segmentation_id=1,
vif_ports={port.port_name: port})
local_vlan_map = {'1': lvm}
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
port_tags_dict = {1: []}
port_set = set(['tap00000001-00',
'tap00000003-00'])
registered_ports = set(['tap00000001-00', 'tap00000002-00'])
port_tags_dict = {'tap00000001-00': []}
expected = dict(
added=set([3]), current=vif_port_set,
removed=set([2]), updated=set([1])
added=set(['tap00000003-00']),
current=set(['tap00000001-00', 'tap00000003-00']),
removed=set(['tap00000002-00']),
updated=set(['tap00000001-00'])
)
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map):
actual = self.mock_scan_ports(
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.Mock())):
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
mock.patch.object(self.agent, '_get_ports',
return_value=[mock.Mock(port_name='xxx')])):
self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx']))
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
@ -432,13 +433,14 @@ class TestOFANeutronAgent(OFAAgentTestCase):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
return_value=details),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=port),
mock.patch.object(self.agent, '_get_ports',
return_value=[port]),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated(
[port.port_name]))
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
@ -478,14 +480,15 @@ class TestOFANeutronAgent(OFAAgentTestCase):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
return_value=fake_details_dict),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.MagicMock()),
mock.patch.object(self.agent, '_get_ports',
return_value=[mock.Mock(port_name='xxx')]),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated(
['xxx']))
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
@ -561,7 +564,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
recl_fn.assert_called_with("123")
def test_port_update(self):
port = {"id": "123",
port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
"network_id": "124",
"admin_state_up": False}
self.agent.port_update("unused_context",
@ -569,7 +572,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
self.assertEqual(set(['123']), self.agent.updated_ports)
self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports)
def test_setup_physical_bridges(self):
with contextlib.nested(
@ -966,6 +969,32 @@ class TestOFANeutronAgent(OFAAgentTestCase):
table_id=ofp.OFPTT_ALL)
sendmsg.assert_has_calls([mock.call(expected_msg)])
def test__get_ports(self):
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge',
port_no=8)])]
sendmsg = mock.Mock(return_value=reply)
self.mod_agent.ryu_api.send_msg = sendmsg
result = self.agent._get_ports(self.agent.int_br)
result = list(result) # convert generator to list.
self.assertEqual(1, len(result))
self.assertEqual('hoge', result[0].port_name)
self.assertEqual(8, result[0].ofport)
expected_msg = ofpp.OFPPortDescStatsRequest(
datapath=self.agent.int_br.datapath)
sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp,
msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply,
reply_multi=True)])
def test__get_ofport_names(self):
names = ['p111', 'p222', 'p333']
ps = [mock.Mock(port_name=x, ofport=names.index(x)) for x in names]
with mock.patch.object(self.agent, '_get_ports',
return_value=ps) as _get_ports:
result = self.agent._get_ofport_names('hoge')
_get_ports.assert_called_once_with('hoge')
self.assertEqual(set(names), result)
class AncillaryBridgesTest(OFAAgentTestCase):

View File

@ -0,0 +1,32 @@
# Copyright (C) 2014 VA Linux Systems Japan K.K.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
import mock
from neutron.plugins.ofagent.agent import ports
from neutron.tests import base
class TestOFAgentPorts(base.BaseTestCase):
def test_port(self):
p1 = ports.Port(port_name='foo', ofport=999)
ryu_ofp_port = mock.Mock(port_no=999)
ryu_ofp_port.name = 'foo'
p2 = ports.Port.from_ofp_port(ofp_port=ryu_ofp_port)
self.assertEqual(p1.port_name, p2.port_name)
self.assertEqual(p1.ofport, p2.ofport)