Meter flows and ovsdb action for ovs bridge

Add meter flows actions and ovsdb actions for pps
limitation. Meter flow actions are:
* list_meter_features
* create_meter
* delete_meter
* update_meter
* apply_meter_to_port
* remove_meter_from_port

Ovsdb actions are:
* get_port_tag_by_name
* get_value_from_other_config
* set_value_to_other_config
* remove_value_from_other_config

Partially-Implements: bp/packet-rate-limit
Related-Bug: #1938966
Related-Bug: #1912460
Change-Id: Idc9a2b1f39964fc3b603310ac7f22c1bc58d27f7
This commit is contained in:
LIU Yulong 2021-08-11 18:44:38 +08:00
parent cddd2e5ffa
commit 0232ead2c3
5 changed files with 353 additions and 0 deletions

View File

@ -43,6 +43,9 @@ from neutron.common import utils as common_utils
from neutron.conf.agent import ovs_conf from neutron.conf.agent import ovs_conf
from neutron.plugins.ml2.drivers.openvswitch.agent.common \ from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import exceptions as ovs_exc
UINT64_BITMASK = (1 << 64) - 1 UINT64_BITMASK = (1 << 64) - 1
@ -1197,6 +1200,47 @@ class OVSBridge(BaseOVS):
port_type = [port_type] port_type = [port_type]
return [port['name'] for port in ports if port['type'] in port_type] return [port['name'] for port in ports if port['type'] in port_type]
def get_port_tag_by_name(self, port_name):
# At the very beginning of port processing, the port tag
# may not set to ovsdb Port. But, we set the tag to
# other_config.
return self.get_value_from_other_config(port_name, 'tag', int)
def get_value_from_other_config(self, port_name,
key, value_type=None):
try:
other_config = self.db_get_val(
'Port', port_name, 'other_config') or {}
value = other_config.get(key)
if value is not None:
if value_type:
return value_type(value)
return value
except (TypeError, ValueError):
raise ovs_exc.OVSDBPortError(port=port_name)
def set_value_to_other_config(self, port_name, key, value):
other_config = self.db_get_val(
'Port', port_name, 'other_config')
if isinstance(other_config, dict):
other_config[key] = str(value)
# set_db_attribute does not work
with self.ovsdb.transaction() as txn:
txn.add(
self.ovsdb.db_set('Port', port_name,
('other_config', other_config)))
def remove_value_from_other_config(self, port_name, key):
other_config = self.db_get_val(
'Port', port_name, 'other_config')
if isinstance(other_config, dict):
other_config.pop(key, None)
# set_db_attribute does not work
with self.ovsdb.transaction() as txn:
txn.add(self.ovsdb.db_clear('Port', port_name, "other_config"))
txn.add(self.ovsdb.db_set('Port', port_name,
('other_config', other_config)))
def __enter__(self): def __enter__(self):
self.create() self.create()
return self return self

View File

@ -0,0 +1,21 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions
from neutron._i18n import _
class OVSDBPortError(exceptions.NeutronException):
message = _("Port %(port)s is not found in ovsdb. "
"Or the requested value type is invalid.")

View File

@ -1,5 +1,6 @@
# Copyright (C) 2014,2015 VA Linux Systems Japan K.K. # Copyright (C) 2014,2015 VA Linux Systems Japan K.K.
# Copyright (C) 2014,2015 YAMAMOTO Takashi <yamamoto at valinux co jp> # Copyright (C) 2014,2015 YAMAMOTO Takashi <yamamoto at valinux co jp>
# Copyright (c) 2021-2022 Chinaunicom
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -433,6 +434,96 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge,
match=match, match=match,
dest_table_id=constants.ARP_SPOOF_TABLE) dest_table_id=constants.ARP_SPOOF_TABLE)
def list_meter_features(self):
(dp, _ofp, ofpp) = self._get_dp()
req = ofpp.OFPMeterFeaturesStatsRequest(dp, 0)
rep = self._send_msg(req, reply_cls=ofpp.OFPMeterFeaturesStatsReply)
features = []
for stat in rep.body:
features.append({"max_meter": stat.max_meter,
"band_types": stat.band_types,
"capabilities": stat.capabilities,
"max_bands": stat.max_bands,
"max_color": stat.max_color})
return features
def create_meter(self, meter_id, rate, burst=0):
(dp, ofp, ofpp) = self._get_dp()
bands = [
ofpp.OFPMeterBandDrop(rate=rate, burst_size=burst)]
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_ADD,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id,
bands=bands)
self._send_msg(req)
def delete_meter(self, meter_id):
(dp, ofp, ofpp) = self._get_dp()
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_DELETE,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id)
self._send_msg(req)
def update_meter(self, meter_id, rate, burst=0):
(dp, ofp, ofpp) = self._get_dp()
bands = [
ofpp.OFPMeterBandDrop(rate=rate, burst_size=burst)]
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_MODIFY,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id,
bands=bands)
self._send_msg(req)
def apply_meter_to_port(self, meter_id, direction, mac,
in_port=None, local_vlan=None):
"""Add meter flows to port.
Ingress: match dst MAC and local_vlan ID
Egress: match src MAC and OF in_port
"""
(_dp, ofp, ofpp) = self._get_dp()
if direction == lib_consts.EGRESS_DIRECTION and in_port:
match = ofpp.OFPMatch(in_port=in_port, eth_src=mac)
elif direction == lib_consts.INGRESS_DIRECTION and local_vlan:
vlan_vid = local_vlan | ofp.OFPVID_PRESENT
match = ofpp.OFPMatch(vlan_vid=vlan_vid, eth_dst=mac)
else:
LOG.warning("Invalid inputs to add meter flows to port.")
return
instructions = [
ofpp.OFPInstructionMeter(meter_id, type_=ofp.OFPIT_METER),
ofpp.OFPInstructionGotoTable(table_id=constants.TRANSIENT_TABLE)]
self.install_instructions(table_id=constants.PACKET_RATE_LIMIT,
priority=100,
instructions=instructions,
match=match)
def remove_meter_from_port(self, direction, mac,
in_port=None, local_vlan=None):
"""Remove meter flows from port.
Ingress: match dst MAC and local_vlan ID
Egress: match src MAC and OF in_port
"""
(_dp, ofp, ofpp) = self._get_dp()
if direction == lib_consts.EGRESS_DIRECTION and in_port:
match = ofpp.OFPMatch(in_port=in_port, eth_src=mac)
elif direction == lib_consts.INGRESS_DIRECTION and local_vlan:
vlan_vid = local_vlan | ofp.OFPVID_PRESENT
match = ofpp.OFPMatch(vlan_vid=vlan_vid, eth_dst=mac)
else:
LOG.warning("Invalid inputs to remove meter flows from port.")
return
self.uninstall_flows(table_id=constants.PACKET_RATE_LIMIT,
match=match)
def delete_arp_spoofing_protection(self, port): def delete_arp_spoofing_protection(self, port):
(_dp, ofp, ofpp) = self._get_dp() (_dp, ofp, ofpp) = self._get_dp()
match = self._arp_reply_match(ofp, ofpp, port=port) match = self._arp_reply_match(ofp, ofpp, port=port)

View File

@ -25,6 +25,8 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import utils from neutron.agent.common import utils
from neutron.plugins.ml2.drivers.openvswitch.agent.common \ from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as p_const import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import exceptions as ovs_exc
from neutron.tests import base from neutron.tests import base
@ -230,6 +232,85 @@ class OVS_Lib_Test(base.BaseTestCase):
mock.call('Bridge', self.BR_NAME, mock.call('Bridge', self.BR_NAME,
'protocols', p_const.OPENFLOW13)]) 'protocols', p_const.OPENFLOW13)])
def test_get_port_tag_by_name(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val:
self.br.get_port_tag_by_name(port_name)
db_get_val.assert_called_once_with(
'Port', port_name, 'other_config')
def test_get_value_from_other_config(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
value = "test_value"
other_config = {"test_key": value}
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val:
db_get_val.return_value = other_config
v = self.br.get_value_from_other_config(port_name, "test_key")
self.assertEqual(value, v)
def test_get_value_from_other_config_value_error(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
value = "test_value"
other_config = {"test_key": value}
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val:
db_get_val.return_value = other_config
self.assertRaises(ovs_exc.OVSDBPortError,
self.br.get_value_from_other_config,
port_name, "test_key", int)
def test_get_value_from_other_config_not_found(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
value = "test_value"
other_config = {"test_key": value}
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val:
db_get_val.return_value = other_config
self.assertIsNone(
self.br.get_value_from_other_config(
port_name, "key_not_exist"))
def test_set_value_to_other_config(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
value = "test_value"
other_config = {"test_key": value}
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val, \
mock.patch.object(self.br.ovsdb, 'db_set') as set_db:
new_key = "new_key"
new_value = "new_value"
db_get_val.return_value = other_config
self.br.set_value_to_other_config(port_name, key=new_key,
value=new_value)
db_get_val.assert_called_once_with('Port', port_name,
'other_config')
other_config.update({new_key: new_value})
set_db.assert_called_once_with(
'Port', port_name, ('other_config', other_config))
def test_remove_value_from_other_config(self):
self.br = ovs_lib.OVSBridge(self.BR_NAME)
old_key = "old_key"
old_value = "old_value"
other_config = {old_key: old_value}
port_name = "fake-port"
with mock.patch.object(self.br, 'db_get_val') as db_get_val, \
mock.patch.object(self.br.ovsdb, 'db_clear') as db_clear, \
mock.patch.object(self.br.ovsdb, 'db_set') as set_db:
db_get_val.return_value = other_config
self.br.remove_value_from_other_config(port_name, key=old_key)
db_get_val.assert_called_once_with('Port', port_name,
'other_config')
db_clear.assert_called_once_with(
'Port', port_name, "other_config")
set_db.assert_called_once_with(
'Port', port_name, ('other_config', {}))
def test_add_flow_timeout_set(self): def test_add_flow_timeout_set(self):
flow_dict = collections.OrderedDict([ flow_dict = collections.OrderedDict([
('cookie', 1234), ('cookie', 1234),

View File

@ -669,6 +669,122 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
def test_delete_dvr_dst_mac_for_flat(self): def test_delete_dvr_dst_mac_for_flat(self):
self._test_delete_dvr_dst_mac_for_arp(network_type='flat') self._test_delete_dvr_dst_mac_for_arp(network_type='flat')
def test_list_meter_features(self):
(dp, ofp, ofpp) = self._get_dp()
self.br.list_meter_features()
self.assertIn(
call._send_msg(ofpp.OFPMeterFeaturesStatsRequest(dp, 0),
reply_cls=ofpp.OFPMeterFeaturesStatsReply),
self.mock.mock_calls)
def test_create_meter(self):
meter_id = 1
rate = 2
burst = 0
(dp, ofp, ofpp) = self._get_dp()
self.br.create_meter(meter_id, rate)
bands = [
ofpp.OFPMeterBandDrop(rate=rate, burst_size=burst)]
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_ADD,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id,
bands=bands)
expected = [call._send_msg(req)]
self.assertEqual(expected, self.mock.mock_calls)
def test_delete_meter(self):
meter_id = 1
(dp, ofp, ofpp) = self._get_dp()
self.br.delete_meter(meter_id)
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_DELETE,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id)
expected = [call._send_msg(req)]
self.assertEqual(expected, self.mock.mock_calls)
def test_update_meter(self):
meter_id = 1
rate = 2
burst = 0
(dp, ofp, ofpp) = self._get_dp()
self.br.update_meter(meter_id, rate)
bands = [
ofpp.OFPMeterBandDrop(rate=rate, burst_size=burst)]
req = ofpp.OFPMeterMod(datapath=dp, command=ofp.OFPMC_MODIFY,
flags=ofp.OFPMF_PKTPS, meter_id=meter_id,
bands=bands)
expected = [call._send_msg(req)]
self.assertEqual(expected, self.mock.mock_calls)
def _test_apply_meter_to_port(self, direction, mac,
in_port=None, local_vlan=None):
meter_id = 1
(dp, ofp, ofpp) = self._get_dp()
self.br.apply_meter_to_port(meter_id, direction, mac,
in_port, local_vlan)
if direction == p_const.EGRESS_DIRECTION and in_port:
match = ofpp.OFPMatch(in_port=in_port, eth_src=mac)
elif direction == p_const.INGRESS_DIRECTION and local_vlan:
vlan_vid = local_vlan | ofp.OFPVID_PRESENT
match = ofpp.OFPMatch(vlan_vid=vlan_vid, eth_dst=mac)
instructions = [
ofpp.OFPInstructionMeter(meter_id, type_=ofp.OFPIT_METER),
ofpp.OFPInstructionGotoTable(table_id=constants.TRANSIENT_TABLE)]
expected = [
call._send_msg(ofpp.OFPFlowMod(dp,
cookie=self.stamp,
instructions=instructions,
match=match,
priority=100,
table_id=constants.PACKET_RATE_LIMIT),
active_bundle=None)
]
self.assertEqual(expected, self.mock.mock_calls)
def test_apply_meter_to_port_egress(self):
self._test_apply_meter_to_port(p_const.EGRESS_DIRECTION,
mac="00:02:b3:13:fe:3e",
in_port=1)
def test_apply_meter_to_port_ingress(self):
self._test_apply_meter_to_port(p_const.INGRESS_DIRECTION,
mac="00:02:b3:13:fe:3e",
local_vlan=1)
def _test_remove_meter_from_port(self, direction, mac,
in_port=None, local_vlan=None):
(_dp, ofp, ofpp) = self._get_dp()
self.br.remove_meter_from_port(direction,
mac, in_port, local_vlan)
if direction == p_const.EGRESS_DIRECTION and in_port:
match = ofpp.OFPMatch(in_port=in_port, eth_src=mac)
elif direction == p_const.INGRESS_DIRECTION and local_vlan:
vlan_vid = local_vlan | ofp.OFPVID_PRESENT
match = ofpp.OFPMatch(vlan_vid=vlan_vid, eth_dst=mac)
expected = [
call.uninstall_flows(
table_id=constants.PACKET_RATE_LIMIT,
match=match)
]
self.assertEqual(expected, self.mock.mock_calls)
def test_remove_meter_from_port_egress(self):
self._test_remove_meter_from_port(p_const.EGRESS_DIRECTION,
mac="00:02:b3:13:fe:3e",
in_port=1)
def test_remove_meter_from_port_ingress(self):
self._test_remove_meter_from_port(p_const.INGRESS_DIRECTION,
mac="00:02:b3:13:fe:3e",
local_vlan=1)
def test_install_dscp_marking_rule(self): def test_install_dscp_marking_rule(self):
test_port = 8888 test_port = 8888
test_mark = 38 test_mark = 38