test_ofctl: Add unit test for ofctl_v1_0

Signed-off-by: Minoru TAKAHASHI <takahashi.minoru7@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Minoru TAKAHASHI
2015-06-30 16:22:03 +09:00
committed by FUJITA Tomonori
parent 576d186351
commit 45f3db2640
2 changed files with 220 additions and 134 deletions

View File

@@ -17,9 +17,13 @@ import unittest
import logging
import netaddr
import functools
import inspect
from nose.tools import *
from ryu.lib import addrconv
from ryu.lib import ofctl_v1_0
from ryu.ofproto import ofproto_v1_0, ofproto_v1_0_parser
from ryu.lib import ofctl_v1_2
from ryu.ofproto import ofproto_v1_2, ofproto_v1_2_parser
from ryu.lib import ofctl_v1_3
@@ -133,20 +137,24 @@ class Test_ofctl(unittest.TestCase):
ok_(isinstance(insts, cls))
eq_(insts.meter_id, act["meter_id"])
else:
ok_(isinstance(insts.actions[0], cls))
if test.ver == ofproto_v1_0.OFP_VERSION:
action = insts
else:
action = insts.actions[0]
ok_(isinstance(action, cls))
if act_type == 'OUTPUT':
eq_(insts.actions[0].port, act["port"])
eq_(action.port, act["port"])
elif act_type == 'SET_MPLS_TTL':
eq_(insts.actions[0].mpls_ttl, act["mpls_ttl"])
eq_(action.mpls_ttl, act["mpls_ttl"])
elif act_type in ['PUSH_VLAN', 'PUSH_MPLS',
'POP_MPLS', 'PUSH_PBB']:
eq_(insts.actions[0].ethertype, act["ethertype"])
eq_(action.ethertype, act["ethertype"])
elif act_type == 'SET_QUEUE':
eq_(insts.actions[0].queue_id, act["queue_id"])
eq_(action.queue_id, act["queue_id"])
elif act_type == 'GROUP':
eq_(insts.actions[0].group_id, act["group_id"])
eq_(action.group_id, act["group_id"])
elif act_type == 'SET_NW_TTL':
eq_(insts.actions[0].nw_ttl, act["nw_ttl"])
eq_(action.nw_ttl, act["nw_ttl"])
# action -> str
action_str = actions_to_str(result)
action_str_list = action_str[0].split(':')
@@ -188,49 +196,13 @@ class Test_ofctl(unittest.TestCase):
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
ofproto = dp.ofproto
vid_present = dp.ofproto.OFPVID_PRESENT
expected_value = {
"vlan_vid": {
0: {"to_match": 0 | vid_present, "to_str": "0"},
3: {"to_match": 3 | vid_present, "to_str": "3"},
4095: {"to_match": 4095 | vid_present, "to_str": "4095"},
"0": {"to_match": 0 | vid_present, "to_str": "0"},
"3": {"to_match": 3 | vid_present, "to_str": "3"},
"4095": {"to_match": 4095 | vid_present, "to_str": "4095"},
"0x0000": {"to_match": 0x0000, "to_str": "0x0000"},
"0x0003": {"to_match": 0x0003, "to_str": "0x0003"},
"0x0fff": {"to_match": 0x0fff, "to_str": "0x0fff"},
"0x1000": {"to_match": 0x1000, "to_str": "0"},
"0x1003": {"to_match": 0x1003, "to_str": "3"},
"0x1fff": {"to_match": 0x1fff, "to_str": "4095"},
"4096/4096": {"to_match": (4096, 4096),
"to_str": "0x1000/0x1000"},
"4096/4097": {"to_match": (4096, 4097),
"to_str": "0x1000/0x1001"},
"2744/2748": {"to_match": (2744, 2748),
"to_str": "0x0ab8/0x0abc"},
"2748/2748": {"to_match": (2748, 2748),
"to_str": "0x0abc/0x0abc"},
"2748/2749": {"to_match": (2748, 2749),
"to_str": "0x0abc/0x0abd"},
"0x1000/0x1000": {"to_match": (0x1000, 0x1000),
"to_str": "0x1000/0x1000"},
"0x1000/0x1001": {"to_match": (0x1000, 0x1001),
"to_str": "0x1000/0x1001"},
"0x0ab8/0x0abc": {"to_match": (0x0ab8, 0x0abc),
"to_str": "0x0ab8/0x0abc"},
"0x0abc/0x0abc": {"to_match": (0x0abc, 0x0abc),
"to_str": "0x0abc/0x0abc"},
"0x0abc/0x0abd": {"to_match": (0x0abc, 0x0abd),
"to_str": "0x0abc/0x0abd"}
}
}
# str -> match
match = to_match(dp, attrs)
def equal_match(key, value, match):
field_value = match[key]
key = self._conv_key(test, key, attrs)
field_value = self._get_field_value(test, key, match)
if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
# MAC address
eth, mask = _to_match_eth(value)
@@ -244,6 +216,11 @@ class Test_ofctl(unittest.TestCase):
# without mask
eq_(eth, field_value)
return
elif key in ['dl_src', 'dl_dst']:
eth, mask = _to_match_eth(value)
field_value = addrconv.mac.bin_to_text(field_value)
eq_(eth, field_value)
return
elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
# IPv4 address
ipv4, mask = _to_match_ip(value)
@@ -255,6 +232,18 @@ class Test_ofctl(unittest.TestCase):
# without mask
eq_(ipv4, field_value)
return
elif key in ['nw_src', 'nw_dst']:
# IPv4 address
ipv4, mask = _to_match_ip(value)
field_value = _to_match_ip(field_value)
if mask is not None:
# with mask
eq_(ipv4, field_value[0])
eq_(mask, field_value[1])
else:
# without mask
eq_(ipv4, field_value[0])
return
elif key in ['ipv6_src', 'ipv6_dst']:
# IPv6 address
ipv6, mask = _to_match_ip(value)
@@ -267,7 +256,11 @@ class Test_ofctl(unittest.TestCase):
eq_(ipv6, field_value)
return
elif key == 'vlan_vid':
eq_(expected_value['vlan_vid'][value]['to_match'], field_value)
if test.ver == ofproto_v1_0.OFP_VERSION:
eq_(value, field_value)
else:
eq_(test.expected_value['vlan_vid'][
value]['to_match'], field_value)
return
elif key == 'metadata' or key == 'ipv6_exthdr':
# Metadata or IPv6 Extension Header pseudo-field
@@ -286,20 +279,7 @@ class Test_ofctl(unittest.TestCase):
return
for key, value in attrs.items():
if key in conv_of10_to_of12_dict:
# For old field name
key_new = conv_of10_to_of12_dict[key]
elif key == 'tp_src' or key == 'tp_dst':
# TCP/UDP port
conv = {inet.IPPROTO_TCP: {'tp_src': 'tcp_src',
'tp_dst': 'tcp_dst'},
inet.IPPROTO_UDP: {'tp_src': 'udp_src',
'tp_dst': 'udp_dst'}}
ip_proto = attrs.get('nw_proto', attrs.get('ip_proto', 0))
key_new = conv[ip_proto][key]
else:
key_new = key
equal_match(key_new, value, match)
equal_match(key, value, match)
# match -> str
match_str = match_to_str(match)
@@ -322,15 +302,26 @@ class Test_ofctl(unittest.TestCase):
return
elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
# IPv4 address
ipv4, mask = _to_match_ip(value)
if mask is not None:
# with mask
field_value = field_value.split('/')
eq_(ipv4, field_value[0])
eq_(mask, field_value[1])
if test.ver == ofproto_v1_0.OFP_VERSION:
ipv4, mask = _to_match_ip(value)
field_value = _to_match_ip(field_value)
if mask is not None:
# with mask
eq_(ipv4, field_value[0])
eq_(mask, field_value[1])
else:
# without mask
eq_(ipv4, field_value[0])
else:
# without mask
eq_(ipv4, field_value)
ipv4, mask = _to_match_ip(value)
if mask is not None:
# with mask
field_value = field_value.split('/')
eq_(ipv4, field_value[0])
eq_(mask, field_value[1])
else:
# without mask
eq_(ipv4, field_value)
return
elif key in ['ipv6_src', 'ipv6_dst']:
# IPv6 address
@@ -345,7 +336,11 @@ class Test_ofctl(unittest.TestCase):
eq_(ipv6, field_value)
return
elif key == 'dl_vlan':
eq_(expected_value['vlan_vid'][value]['to_str'], field_value)
if test.ver == ofproto_v1_0.OFP_VERSION:
eq_(value, field_value)
else:
eq_(test.expected_value['vlan_vid'][
value]['to_str'], field_value)
return
elif key == 'metadata' or key == 'ipv6_exthdr':
# Metadata or IPv6 Extension Header pseudo-field
@@ -371,10 +366,114 @@ class Test_ofctl(unittest.TestCase):
key_old = key
equal_str(key_old, value, match_str)
def _conv_key(self, test, key, attrs):
if test.ver != ofproto_v1_0.OFP_VERSION:
if key in conv_of10_to_of12_dict:
# For old field name
key = conv_of10_to_of12_dict[key]
elif key == 'tp_src' or key == 'tp_dst':
# TCP/UDP port
conv = {inet.IPPROTO_TCP: {'tp_src': 'tcp_src',
'tp_dst': 'tcp_dst'},
inet.IPPROTO_UDP: {'tp_src': 'udp_src',
'tp_dst': 'udp_dst'}}
ip_proto = attrs.get('nw_proto', attrs.get('ip_proto', 0))
key = conv[ip_proto][key]
return key
def _get_field_value(self, test, key, match):
if test.ver == ofproto_v1_0.OFP_VERSION:
members = inspect.getmembers(match)
for member in members:
if member[0] == key:
field_value = member[1]
elif member[0] == 'wildcards':
wildcards = member[1]
if key == 'nw_src':
field_value = test.nw_src_to_str(wildcards, field_value)
elif key == 'nw_dst':
field_value = test.nw_dst_to_str(wildcards, field_value)
else:
field_value = match[key]
return field_value
""" Test_data for of_v1_0 """
class test_data_v1_0():
def __init__(self):
self.supported_action = {}
self.supported_match = {}
self.act_list = [
{'type': 'OUTPUT', 'port': 3},
{'type': 'SET_VLAN_VID', 'vlan_vid': 5},
{'type': 'SET_VLAN_PCP', 'vlan_pcp': 3},
{'type': 'STRIP_VLAN'},
{'type': 'SET_DL_SRC', 'dl_src': 'aa:bb:cc:11:22:33'},
{'type': 'SET_DL_DST', 'dl_dst': 'aa:bb:cc:11:22:33'},
{'type': 'SET_NW_SRC', 'nw_src': '10.0.0.1'},
{'type': 'SET_NW_DST', 'nw_dst': '10.0.0.1'},
{'type': 'SET_NW_TOS', 'nw_tos': 184},
{'type': 'SET_TP_SRC', 'tp_src': 8080},
{'type': 'SET_TP_DST', 'tp_dst': 8080},
{'type': 'ENQUEUE', 'queue_id': 3, 'port': 1}
]
self.attr_list = [
{'in_port': 7},
{'dl_src': 'aa:bb:cc:11:22:33'},
{'dl_dst': 'aa:bb:cc:11:22:33'},
{'dl_vlan': 5},
{'dl_vlan_pcp': 3},
{'dl_type': 123},
{'nw_tos': 16},
{'nw_proto': 5},
{'nw_src': '192.168.0.1'},
{'nw_src': '192.168.0.1/24'},
{'nw_dst': '192.168.0.1'},
{'nw_dst': '192.168.0.1/24'},
{'tp_src': 1},
{'tp_dst': 2}
]
def set_ver(self, ver):
self.ver = ver
def set_attr(self, ofctl):
self.to_match = getattr(ofctl, "to_match")
self.match_to_str = getattr(ofctl, "match_to_str")
self.to_actions = getattr(ofctl, "to_actions")
self.actions_to_str = getattr(ofctl, "actions_to_str")
self.nw_src_to_str = getattr(ofctl, "nw_src_to_str")
self.nw_dst_to_str = getattr(ofctl, "nw_dst_to_str")
def set_expected_value(self, ofproto):
pass
def set_action_v1_0(self, parser):
self.supported_action.update(
{
'OUTPUT': getattr(parser, "OFPActionOutput"),
'SET_VLAN_VID': getattr(parser, "OFPActionVlanVid"),
'SET_VLAN_PCP': getattr(parser, "OFPActionVlanPcp"),
'STRIP_VLAN': getattr(parser, "OFPActionStripVlan"),
'SET_DL_SRC': getattr(parser, "OFPActionSetDlSrc"),
'SET_DL_DST': getattr(parser, "OFPActionSetDlDst"),
'SET_NW_SRC': getattr(parser, "OFPActionSetNwSrc"),
'SET_NW_DST': getattr(parser, "OFPActionSetNwDst"),
'SET_NW_TOS': getattr(parser, "OFPActionSetNwTos"),
'SET_TP_SRC': getattr(parser, "OFPActionSetTpSrc"),
'SET_TP_DST': getattr(parser, "OFPActionSetTpDst"),
'ENQUEUE': getattr(parser, "OFPActionEnqueue")
})
""" Test_data for of_v1_2 """
class test_data_v1_2():
class test_data_v1_2(test_data_v1_0):
def __init__(self):
self.supported_action = {}
@@ -513,15 +612,51 @@ class test_data_v1_2():
{'mpls_tc': 2, 'eth_type': 0x8848}
]
def set_ver(self, ver):
self.ver = ver
def set_attr(self, ofctl):
self.to_match = getattr(ofctl, "to_match")
self.match_to_str = getattr(ofctl, "match_to_str")
self.to_actions = getattr(ofctl, "to_actions")
self.actions_to_str = getattr(ofctl, "actions_to_str")
def set_expected_value(self, ofproto):
vid_present = ofproto.OFPVID_PRESENT
self.expected_value = {
"vlan_vid": {
0: {"to_match": 0 | vid_present, "to_str": "0"},
3: {"to_match": 3 | vid_present, "to_str": "3"},
4095: {"to_match": 4095 | vid_present, "to_str": "4095"},
"0": {"to_match": 0 | vid_present, "to_str": "0"},
"3": {"to_match": 3 | vid_present, "to_str": "3"},
"4095": {"to_match": 4095 | vid_present, "to_str": "4095"},
"0x0000": {"to_match": 0x0000, "to_str": "0x0000"},
"0x0003": {"to_match": 0x0003, "to_str": "0x0003"},
"0x0fff": {"to_match": 0x0fff, "to_str": "0x0fff"},
"0x1000": {"to_match": 0x1000, "to_str": "0"},
"0x1003": {"to_match": 0x1003, "to_str": "3"},
"0x1fff": {"to_match": 0x1fff, "to_str": "4095"},
"4096/4096": {"to_match": (4096, 4096),
"to_str": "0x1000/0x1000"},
"4096/4097": {"to_match": (4096, 4097),
"to_str": "0x1000/0x1001"},
"2744/2748": {"to_match": (2744, 2748),
"to_str": "0x0ab8/0x0abc"},
"2748/2748": {"to_match": (2748, 2748),
"to_str": "0x0abc/0x0abc"},
"2748/2749": {"to_match": (2748, 2749),
"to_str": "0x0abc/0x0abd"},
"0x1000/0x1000": {"to_match": (0x1000, 0x1000),
"to_str": "0x1000/0x1000"},
"0x1000/0x1001": {"to_match": (0x1000, 0x1001),
"to_str": "0x1000/0x1001"},
"0x0ab8/0x0abc": {"to_match": (0x0ab8, 0x0abc),
"to_str": "0x0ab8/0x0abc"},
"0x0abc/0x0abc": {"to_match": (0x0abc, 0x0abc),
"to_str": "0x0abc/0x0abc"},
"0x0abc/0x0abd": {"to_match": (0x0abc, 0x0abd),
"to_str": "0x0abc/0x0abd"}
}
}
def set_action_v1_2(self, parser):
self.supported_action.update(
{
@@ -689,12 +824,22 @@ def _add_tests_match(cls):
""" Test case """
# for of10
cls = test_data_v1_0()
cls.set_action_v1_0(ofproto_v1_0_parser)
cls.set_ver(ofproto_v1_0.OFP_VERSION)
cls.set_attr(ofctl_v1_0)
cls.set_expected_value(ofproto_v1_0)
_add_tests_actions(cls)
_add_tests_match(cls)
# for of12
cls = test_data_v1_2()
cls.set_action_v1_2(ofproto_v1_2_parser)
cls.set_match_v1_2(ofproto_v1_2_parser)
cls.set_ver(ofproto_v1_2.OFP_VERSION)
cls.set_attr(ofctl_v1_2)
cls.set_expected_value(ofproto_v1_2)
_add_tests_actions(cls)
_add_tests_match(cls)
@@ -704,6 +849,7 @@ cls.set_action_v1_3(ofproto_v1_3_parser)
cls.set_match_v1_3(ofproto_v1_3_parser)
cls.set_ver(ofproto_v1_3.OFP_VERSION)
cls.set_attr(ofctl_v1_3)
cls.set_expected_value(ofproto_v1_3)
_add_tests_actions(cls)
_add_tests_match(cls)

View File

@@ -1,60 +0,0 @@
# Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
import logging
from nose.tools import *
from ryu.lib import ofctl_v1_0
from ryu.ofproto import ofproto_v1_0, ofproto_v1_0_parser
from ryu.ofproto import ofproto_protocol
LOG = logging.getLogger('test_ofctl_v1_0')
class Test_ofctl_v1_0(unittest.TestCase):
""" Test case for ofctl_v1_0
"""
def setUp(self):
self.dp = ofproto_protocol.ProtocolDesc(
version=ofproto_v1_0.OFP_VERSION)
self.attrs_list = [
{"in_port": 3},
{"dl_vlan": 3},
{"dl_src": "11:11:11:11:11:11"},
{"dl_dst": "11:11:11:11:11:12"},
{"nw_tos": 16, "dl_type": 2048},
{"nw_proto": 5, "dl_type": 2048},
{"tp_src": 1, "nw_proto": 6, "dl_type": 2048},
{"tp_dst": 2, "nw_proto": 6, "dl_type": 2048},
{"nw_src": "192.168.1.5", "dl_type": 2048},
{"nw_dst": "192.168.1.5/12", "dl_type": 2048},
{"nw_dst": "192.168.1.5/1"},
{"nw_dst": "192.168.1.5/12"},
{"dl_vlan_pcp": 3}
]
def tearDown(self):
pass
def test_match_to_str(self):
for attrs in self.attrs_list:
match = ofctl_v1_0.to_match(self.dp, attrs)
str = ofctl_v1_0.match_to_str(match)
eq_(attrs, str)