packet lib: icmp: rewrite unittest
formerly this module tested only to_string(), so a bug in parser() of 'TimeExceeded' was overlooked. this patch will bring the module to test all the methods of all the classes in 'icmp' module. Signed-off-by: itoyuichi <ito.yuichi0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
91e54cda62
commit
54989c7862
@ -17,47 +17,186 @@
|
||||
import unittest
|
||||
import inspect
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from nose.tools import *
|
||||
from nose.plugins.skip import Skip, SkipTest
|
||||
from nose.tools import eq_
|
||||
from ryu.lib.packet import icmp
|
||||
from ryu.lib.packet import packet_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Test_icmp_dest_unreach(unittest.TestCase):
|
||||
class Test_icmp(unittest.TestCase):
|
||||
|
||||
type_ = icmp.ICMP_DEST_UNREACH
|
||||
code = icmp.ICMP_HOST_UNREACH_CODE
|
||||
csum = 0
|
||||
echo_id = None
|
||||
echo_seq = None
|
||||
echo_data = None
|
||||
|
||||
mtu = 10
|
||||
data = 'abc'
|
||||
data_len = len(data)
|
||||
dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data)
|
||||
unreach_mtu = None
|
||||
unreach_data = None
|
||||
unreach_data_len = None
|
||||
|
||||
ic = icmp.icmp(type_, code, csum, data=dst_unreach)
|
||||
te_data = None
|
||||
te_data_len = None
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
self.type_ = icmp.ICMP_ECHO_REQUEST
|
||||
self.code = 0
|
||||
self.csum = 0
|
||||
self.data = None
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
|
||||
|
||||
self.buf = bytearray(struct.pack(
|
||||
icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
|
||||
self.csum_calc = packet_utils.checksum(str(self.buf))
|
||||
struct.pack_into('!H', self.buf, 2, self.csum_calc)
|
||||
|
||||
def setUp_with_echo(self):
|
||||
self.echo_id = 13379
|
||||
self.echo_seq = 1
|
||||
self.echo_data = '\x30\x0e\x09\x00\x00\x00\x00\x00' \
|
||||
+ '\x10\x11\x12\x13\x14\x15\x16\x17' \
|
||||
+ '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
|
||||
+ '\x20\x21\x22\x23\x24\x25\x26\x27' \
|
||||
+ '\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
|
||||
+ '\x30\x31\x32\x33\x34\x35\x36\x37'
|
||||
self.data = icmp.echo(
|
||||
id_=self.echo_id, seq=self.echo_seq, data=self.echo_data)
|
||||
|
||||
self.type_ = icmp.ICMP_ECHO_REQUEST
|
||||
self.code = 0
|
||||
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
|
||||
|
||||
self.buf = struct.pack(
|
||||
icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
|
||||
self.buf += self.data.serialize()
|
||||
self.csum_calc = packet_utils.checksum(str(self.buf))
|
||||
struct.pack_into('!H', self.buf, 2, self.csum_calc)
|
||||
|
||||
def setUp_with_dest_unreach(self):
|
||||
self.unreach_mtu = 10
|
||||
self.unreach_data = 'abc'
|
||||
self.unreach_data_len = len(self.unreach_data)
|
||||
self.data = icmp.dest_unreach(
|
||||
data_len=self.unreach_data_len, mtu=self.unreach_mtu,
|
||||
data=self.unreach_data)
|
||||
|
||||
self.type_ = icmp.ICMP_DEST_UNREACH
|
||||
self.code = icmp.ICMP_HOST_UNREACH_CODE
|
||||
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
|
||||
|
||||
self.buf = struct.pack(
|
||||
icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
|
||||
self.buf += self.data.serialize()
|
||||
self.csum_calc = packet_utils.checksum(str(self.buf))
|
||||
struct.pack_into('!H', self.buf, 2, self.csum_calc)
|
||||
|
||||
def setUp_with_TimeExceeded(self):
|
||||
self.te_data = 'abc'
|
||||
self.te_data_len = len(self.te_data)
|
||||
self.data = icmp.TimeExceeded(
|
||||
data_len=self.te_data_len, data=self.te_data)
|
||||
|
||||
self.type_ = icmp.ICMP_TIME_EXCEEDED
|
||||
self.code = 0
|
||||
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
|
||||
|
||||
self.buf = struct.pack(
|
||||
icmp.icmp._PACK_STR, self.type_, self.code, self.csum)
|
||||
self.buf += self.data.serialize()
|
||||
self.csum_calc = packet_utils.checksum(str(self.buf))
|
||||
struct.pack_into('!H', self.buf, 2, self.csum_calc)
|
||||
|
||||
def test_init(self):
|
||||
eq_(self.type_, self.ic.type)
|
||||
eq_(self.code, self.ic.code)
|
||||
eq_(self.csum, self.ic.csum)
|
||||
eq_(str(self.data), str(self.ic.data))
|
||||
|
||||
def test_init_with_echo(self):
|
||||
self.setUp_with_echo()
|
||||
self.test_init()
|
||||
|
||||
def test_init_with_dest_unreach(self):
|
||||
self.setUp_with_dest_unreach()
|
||||
self.test_init()
|
||||
|
||||
def test_init_with_TimeExceeded(self):
|
||||
self.setUp_with_TimeExceeded()
|
||||
self.test_init()
|
||||
|
||||
def test_parser(self):
|
||||
_res = icmp.icmp.parser(str(self.buf))
|
||||
if type(_res) is tuple:
|
||||
res = _res[0]
|
||||
else:
|
||||
res = _res
|
||||
|
||||
eq_(self.type_, res.type)
|
||||
eq_(self.code, res.code)
|
||||
eq_(self.csum_calc, res.csum)
|
||||
eq_(str(self.data), str(res.data))
|
||||
|
||||
def test_parser_with_echo(self):
|
||||
self.setUp_with_echo()
|
||||
self.test_parser()
|
||||
|
||||
def test_parser_with_dest_unreach(self):
|
||||
self.setUp_with_dest_unreach()
|
||||
self.test_parser()
|
||||
|
||||
def test_parser_with_TimeExceeded(self):
|
||||
self.setUp_with_TimeExceeded()
|
||||
self.test_parser()
|
||||
|
||||
def test_serialize(self):
|
||||
data = bytearray()
|
||||
prev = None
|
||||
buf = self.ic.serialize(data, prev)
|
||||
|
||||
res = struct.unpack_from(icmp.icmp._PACK_STR, str(buf))
|
||||
|
||||
eq_(self.type_, res[0])
|
||||
eq_(self.code, res[1])
|
||||
eq_(self.csum_calc, res[2])
|
||||
|
||||
def test_serialize_with_echo(self):
|
||||
self.setUp_with_echo()
|
||||
self.test_serialize()
|
||||
|
||||
data = bytearray()
|
||||
prev = None
|
||||
buf = self.ic.serialize(data, prev)
|
||||
echo = icmp.echo.parser(str(buf), icmp.icmp._MIN_LEN)
|
||||
eq_(repr(self.data), repr(echo))
|
||||
|
||||
def test_serialize_with_dest_unreach(self):
|
||||
self.setUp_with_dest_unreach()
|
||||
self.test_serialize()
|
||||
|
||||
data = bytearray()
|
||||
prev = None
|
||||
buf = self.ic.serialize(data, prev)
|
||||
unreach = icmp.dest_unreach.parser(str(buf), icmp.icmp._MIN_LEN)
|
||||
eq_(repr(self.data), repr(unreach))
|
||||
|
||||
def test_serialize_with_TimeExceeded(self):
|
||||
self.setUp_with_TimeExceeded()
|
||||
self.test_serialize()
|
||||
|
||||
data = bytearray()
|
||||
prev = None
|
||||
buf = self.ic.serialize(data, prev)
|
||||
te = icmp.TimeExceeded.parser(str(buf), icmp.icmp._MIN_LEN)
|
||||
eq_(repr(self.data), repr(te))
|
||||
|
||||
def test_to_string(self):
|
||||
data_values = {'data': self.data,
|
||||
'data_len': self.data_len,
|
||||
'mtu': self.mtu}
|
||||
_data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
|
||||
for k, v in inspect.getmembers(self.dst_unreach)
|
||||
if k in data_values])
|
||||
data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str)
|
||||
|
||||
icmp_values = {'type': repr(self.type_),
|
||||
'code': repr(self.code),
|
||||
'csum': repr(self.csum),
|
||||
'data': data_str}
|
||||
'data': repr(self.data)}
|
||||
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
|
||||
for k, v in inspect.getmembers(self.ic)
|
||||
if k in icmp_values])
|
||||
@ -65,3 +204,118 @@ class Test_icmp_dest_unreach(unittest.TestCase):
|
||||
|
||||
eq_(str(self.ic), ic_str)
|
||||
eq_(repr(self.ic), ic_str)
|
||||
|
||||
def test_to_string_with_echo(self):
|
||||
self.setUp_with_echo()
|
||||
self.test_to_string()
|
||||
|
||||
def test_to_string_with_dest_unreach(self):
|
||||
self.setUp_with_dest_unreach()
|
||||
self.test_to_string()
|
||||
|
||||
def test_to_string_with_TimeExceeded(self):
|
||||
self.setUp_with_TimeExceeded()
|
||||
self.test_to_string()
|
||||
|
||||
|
||||
class Test_echo(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.id_ = 13379
|
||||
self.seq = 1
|
||||
self.data = '\x30\x0e\x09\x00\x00\x00\x00\x00' \
|
||||
+ '\x10\x11\x12\x13\x14\x15\x16\x17' \
|
||||
+ '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
|
||||
+ '\x20\x21\x22\x23\x24\x25\x26\x27' \
|
||||
+ '\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
|
||||
+ '\x30\x31\x32\x33\x34\x35\x36\x37'
|
||||
self.echo = icmp.echo(
|
||||
self.id_, self.seq, self.data)
|
||||
self.buf = struct.pack('!HH', self.id_, self.seq)
|
||||
self.buf += self.data
|
||||
|
||||
def test_init(self):
|
||||
eq_(self.id_, self.echo.id)
|
||||
eq_(self.seq, self.echo.seq)
|
||||
eq_(self.data, self.echo.data)
|
||||
|
||||
def test_parser(self):
|
||||
_res = icmp.echo.parser(self.buf, 0)
|
||||
if type(_res) is tuple:
|
||||
res = _res[0]
|
||||
else:
|
||||
res = _res
|
||||
eq_(self.id_, res.id)
|
||||
eq_(self.seq, res.seq)
|
||||
eq_(self.data, res.data)
|
||||
|
||||
def test_serialize(self):
|
||||
buf = self.echo.serialize()
|
||||
res = struct.unpack_from('!HH', str(buf))
|
||||
eq_(self.id_, res[0])
|
||||
eq_(self.seq, res[1])
|
||||
eq_(self.data, buf[struct.calcsize('!HH'):])
|
||||
|
||||
|
||||
class Test_dest_unreach(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mtu = 10
|
||||
self.data = 'abc'
|
||||
self.data_len = len(self.data)
|
||||
self.dest_unreach = icmp.dest_unreach(
|
||||
data_len=self.data_len, mtu=self.mtu, data=self.data)
|
||||
self.buf = struct.pack('!xBH', self.data_len, self.mtu)
|
||||
self.buf += self.data
|
||||
|
||||
def test_init(self):
|
||||
eq_(self.data_len, self.dest_unreach.data_len)
|
||||
eq_(self.mtu, self.dest_unreach.mtu)
|
||||
eq_(self.data, self.dest_unreach.data)
|
||||
|
||||
def test_parser(self):
|
||||
_res = icmp.dest_unreach.parser(self.buf, 0)
|
||||
if type(_res) is tuple:
|
||||
res = _res[0]
|
||||
else:
|
||||
res = _res
|
||||
eq_(self.data_len, res.data_len)
|
||||
eq_(self.mtu, res.mtu)
|
||||
eq_(self.data, res.data)
|
||||
|
||||
def test_serialize(self):
|
||||
buf = self.dest_unreach.serialize()
|
||||
res = struct.unpack_from('!xBH', str(buf))
|
||||
eq_(self.data_len, res[0])
|
||||
eq_(self.mtu, res[1])
|
||||
eq_(self.data, buf[struct.calcsize('!xBH'):])
|
||||
|
||||
|
||||
class Test_TimeExceeded(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.data = 'abc'
|
||||
self.data_len = len(self.data)
|
||||
self.te = icmp.TimeExceeded(
|
||||
data_len=self.data_len, data=self.data)
|
||||
self.buf = struct.pack('!xBxx', self.data_len)
|
||||
self.buf += self.data
|
||||
|
||||
def test_init(self):
|
||||
eq_(self.data_len, self.te.data_len)
|
||||
eq_(self.data, self.te.data)
|
||||
|
||||
def test_parser(self):
|
||||
_res = icmp.TimeExceeded.parser(self.buf, 0)
|
||||
if type(_res) is tuple:
|
||||
res = _res[0]
|
||||
else:
|
||||
res = _res
|
||||
eq_(self.data_len, res.data_len)
|
||||
eq_(self.data, res.data)
|
||||
|
||||
def test_serialize(self):
|
||||
buf = self.te.serialize()
|
||||
res = struct.unpack_from('!xBxx', str(buf))
|
||||
eq_(self.data_len, res[0])
|
||||
eq_(self.data, buf[struct.calcsize('!xBxx'):])
|
||||
|
Loading…
Reference in New Issue
Block a user