test: add unittests for packet library

Signed-off-by: HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
HIYAMA Manabu 2012-10-11 17:54:37 +09:00 committed by FUJITA Tomonori
parent c92d8be079
commit 21f29c6f41
4 changed files with 316 additions and 0 deletions

View File

View File

@ -0,0 +1,176 @@
# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.arp import arp
from ryu.lib.packet.vlan import vlan
LOG = logging.getLogger('test_arp')
class Test_arp(unittest.TestCase):
""" Test case for arp
"""
hwtype = 1
proto = 0x0800
hlen = 6
plen = 4
opcode = 1
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
src_ip = int(netaddr.IPAddress('24.166.172.1'))
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
dst_ip = int(netaddr.IPAddress('24.166.173.159'))
fmt = arp._PACK_STR
length = struct.calcsize(arp._PACK_STR)
buf = pack(fmt, hwtype, proto, hlen, plen, opcode, src_mac, src_ip,
dst_mac, dst_ip)
a = arp(hwtype, proto, hlen, plen, opcode, src_mac, src_ip, dst_mac,
dst_ip)
def setUp(self):
pass
def tearDown(self):
pass
def find_protocol(self, pkt, name):
for p in pkt.protocols:
if p.protocol_name == name:
return p
def test_init(self):
eq_(self.hwtype, self.a.hwtype)
eq_(self.proto, self.a.proto)
eq_(self.hlen, self.a.hlen)
eq_(self.plen, self.a.plen)
eq_(self.opcode, self.a.opcode)
eq_(self.src_mac, self.a.src_mac)
eq_(self.src_ip, self.a.src_ip)
eq_(self.dst_mac, self.a.dst_mac)
eq_(self.dst_ip, self.a.dst_ip)
eq_(self.length, self.a.length)
def test_parser(self):
_res = self.a.parser(self.buf)
if type(_res) is tuple:
res = _res[0]
else:
res = _res
eq_(res.hwtype, self.hwtype)
eq_(res.proto, self.proto)
eq_(res.hlen, self.hlen)
eq_(res.plen, self.plen)
eq_(res.opcode, self.opcode)
eq_(res.src_mac, self.src_mac)
eq_(res.src_ip, self.src_ip)
eq_(res.dst_mac, self.dst_mac)
eq_(res.dst_ip, self.dst_ip)
eq_(res.length, self.length)
def test_serialize(self):
data = bytearray()
prev = None
buf = self.a.serialize(data, prev)
fmt = arp._PACK_STR
res = struct.unpack(fmt, buf)
eq_(res[0], self.hwtype)
eq_(res[1], self.proto)
eq_(res[2], self.hlen)
eq_(res[3], self.plen)
eq_(res[4], self.opcode)
eq_(res[5], self.src_mac)
eq_(res[6], self.src_ip)
eq_(res[7], self.dst_mac)
eq_(res[8], self.dst_ip)
def _build_arp(self, vlan_enabled):
if vlan_enabled is True:
ethertype = ether.ETH_TYPE_8021Q
v = vlan(1, 1, 3, ether.ETH_TYPE_ARP)
else:
ethertype = ether.ETH_TYPE_ARP
e = ethernet(self.dst_mac, self.src_mac, ethertype)
p = Packet()
p.add_protocol(e)
if vlan_enabled is True:
p.add_protocol(v)
p.add_protocol(self.a)
p.serialize()
return p
def test_build_arp_vlan(self):
p = self._build_arp(True)
e = self.find_protocol(p, "ethernet")
ok_(e)
eq_(e.ethertype, ether.ETH_TYPE_8021Q)
v = self.find_protocol(p, "vlan")
ok_(v)
eq_(v.ethertype, ether.ETH_TYPE_ARP)
a = self.find_protocol(p, "arp")
ok_(a)
eq_(a.hwtype, self.hwtype)
eq_(a.proto, self.proto)
eq_(a.hlen, self.hlen)
eq_(a.plen, self.plen)
eq_(a.opcode, self.opcode)
eq_(a.src_mac, self.src_mac)
eq_(a.src_ip, self.src_ip)
eq_(a.dst_mac, self.dst_mac)
eq_(a.dst_ip, self.dst_ip)
def test_build_arp_novlan(self):
p = self._build_arp(False)
e = self.find_protocol(p, "ethernet")
ok_(e)
eq_(e.ethertype, ether.ETH_TYPE_ARP)
a = self.find_protocol(p, "arp")
ok_(a)
eq_(a.hwtype, self.hwtype)
eq_(a.proto, self.proto)
eq_(a.hlen, self.hlen)
eq_(a.plen, self.plen)
eq_(a.opcode, self.opcode)
eq_(a.src_mac, self.src_mac)
eq_(a.src_ip, self.src_ip)
eq_(a.dst_mac, self.dst_mac)
eq_(a.dst_ip, self.dst_ip)
eq_(a.length, self.length)

View File

@ -0,0 +1,139 @@
# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.ipv4 import ipv4
from ryu.lib.packet.vlan import vlan
LOG = logging.getLogger('test_vlan')
class Test_vlan(unittest.TestCase):
""" Test case for vlan
"""
pcp = 0
cfi = 0
vid = 32
tci = pcp << 15 | cfi << 12 | vid
ethertype = ether.ETH_TYPE_IP
length = struct.calcsize(vlan._PACK_STR)
buf = pack(vlan._PACK_STR, tci, ethertype)
v = vlan(pcp, cfi, vid, ethertype)
def setUp(self):
pass
def tearDown(self):
pass
def find_protocol(self, pkt, name):
for p in pkt.protocols:
if p.protocol_name == name:
return p
def test_init(self):
eq_(self.pcp, self.v.pcp)
eq_(self.cfi, self.v.cfi)
eq_(self.vid, self.v.vid)
eq_(self.ethertype, self.v.ethertype)
eq_(self.length, self.v.length)
def test_parser(self):
res, ptype = self.v.parser(self.buf)
eq_(res.pcp, self.pcp)
eq_(res.cfi, self.cfi)
eq_(res.vid, self.vid)
eq_(res.ethertype, self.ethertype)
eq_(res.length, self.length)
eq_(ptype, ipv4)
def test_serialize(self):
data = bytearray()
prev = None
buf = self.v.serialize(data, prev)
fmt = vlan._PACK_STR
res = struct.unpack(fmt, buf)
eq_(res[0], self.tci)
eq_(res[1], self.ethertype)
def _build_vlan(self):
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
ethertype = ether.ETH_TYPE_8021Q
e = ethernet(dst_mac, src_mac, ethertype)
version = 4
header_length = 20
tos = 0
total_length = 24
identification = 0x8a5d
flags = 0
offset = 1480
ttl = 64
proto = inet.IPPROTO_ICMP
csum = 0xa7f2
src = int(netaddr.IPAddress('131.151.32.21'))
dst = int(netaddr.IPAddress('131.151.32.129'))
option = 'TEST'
ip = ipv4(version, header_length, tos, total_length, identification,
flags, offset, ttl, proto, csum, src, dst, option)
p = Packet()
p.add_protocol(e)
p.add_protocol(self.v)
p.add_protocol(ip)
p.serialize()
return p
def test_build_vlan(self):
p = self._build_vlan()
e = self.find_protocol(p, "ethernet")
ok_(e)
eq_(e.ethertype, ether.ETH_TYPE_8021Q)
v = self.find_protocol(p, "vlan")
ok_(v)
eq_(v.ethertype, ether.ETH_TYPE_IP)
ip = self.find_protocol(p, "ipv4")
ok_(ip)
eq_(v.pcp, self.pcp)
eq_(v.cfi, self.cfi)
eq_(v.vid, self.vid)
eq_(v.ethertype, self.ethertype)
eq_(v.length, self.length)

View File

@ -2,3 +2,4 @@ coverage
nose nose
pep8 pep8
pylint==0.25.0 pylint==0.25.0
netaddr