addrconv: use bytes instead of int to represent ipv4 addresses

change ipv4 representation from int to bytes in many places.
replace homegrown bin<->text routines with addrconv for ipv4.

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
YAMAMOTO Takashi 2013-07-23 12:54:10 +09:00 committed by FUJITA Tomonori
parent faf13ff99f
commit d31b5dd367
13 changed files with 48 additions and 60 deletions

View File

@ -1,30 +1,19 @@
from ryu.lib import addrconv
def ipv4_arg_to_bin(w, x, y, z):
"""Generate unsigned int from components of IP address
returns: w << 24 | x << 16 | y << 8 | z"""
return (w << 24) | (x << 16) | (y << 8) | z
def ipv4_to_bin(ip):
'''
Parse an IP address and return an unsigned int.
The IP address is in dotted decimal notation.
'''
args = [int(arg) for arg in ip.split('.')]
return ipv4_arg_to_bin(*args)
return addrconv.ipv4.text_to_bin(ip)
def ipv4_to_str(ip):
"""Generate IP address string from an unsigned int.
ip: unsigned int of form w << 24 | x << 16 | y << 8 | z
returns: ip address string w.x.y.z"""
w = (ip >> 24) & 0xff
x = (ip >> 16) & 0xff
y = (ip >> 8) & 0xff
z = ip & 0xff
return "%i.%i.%i.%i" % (w, x, y, z)
return addrconv.ipv4.bin_to_text(ip)
def ipv6_to_bin(ipv6):

View File

@ -51,7 +51,7 @@ class arp(packet_base.PacketBase):
============== ====================
"""
_PACK_STR = '!HHBBH6sI6sI'
_PACK_STR = '!HHBBH6s4s6s4s'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, hwtype=ARP_HW_TYPE_ETHERNET, proto=ether.ETH_TYPE_IP,

View File

@ -26,7 +26,7 @@ from ryu.lib import ip
IPV4_ADDRESS_PACK_STR = '!I'
IPV4_ADDRESS_LEN = struct.calcsize(IPV4_ADDRESS_PACK_STR)
IPV4_PSEUDO_HEADER_PACK_STR = '!II2xHH'
IPV4_PSEUDO_HEADER_PACK_STR = '!4s4s2xHH'
class ipv4(packet_base.PacketBase):
@ -62,7 +62,7 @@ class ipv4(packet_base.PacketBase):
============== ====================
"""
_PACK_STR = '!BBHHHBBHII'
_PACK_STR = '!BBHHHBBH4s4s'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, version=4, header_length=5, tos=0,

View File

@ -35,7 +35,7 @@ def checksum(data):
# avoid circular import
_IPV4_PSEUDO_HEADER_PACK_STR = '!IIxBH'
_IPV4_PSEUDO_HEADER_PACK_STR = '!4s4sxBH'
_IPV6_PSEUDO_HEADER_PACK_STR = '!16s16sI3xB'

View File

@ -88,7 +88,7 @@ VRRP_IPV4_SRC_MAC_ADDRESS = netaddr.EUI(VRRP_IPV4_SRC_MAC_ADDRESS_STR).packed
VRRP_IPV4_DST_MAC_ADDRESS_STR = '01:00:5E:00:00:12'
VRRP_IPV4_DST_MAC_ADDRESS = netaddr.EUI(VRRP_IPV4_DST_MAC_ADDRESS_STR).packed
VRRP_IPV4_DST_ADDRESS_STR = '224.0.0.18'
VRRP_IPV4_DST_ADDRESS = netaddr.IPAddress(VRRP_IPV4_DST_ADDRESS_STR).value
VRRP_IPV4_DST_ADDRESS = netaddr.IPAddress(VRRP_IPV4_DST_ADDRESS_STR).packed
VRRP_IPV4_TTL = 255
@ -166,10 +166,9 @@ VRRP_V2_MAX_ADVER_INT_MAX = 0xff
def is_ipv6(ip_address):
if type(ip_address) == int:
return False
assert type(ip_address) == str
if len(ip_address) == 4:
return False
assert len(ip_address) == 16
return True
@ -203,7 +202,7 @@ class vrrp(packet_base.PacketBase):
"""
_VERSION_PACK_STR = '!B'
_IPV4_ADDRESS_PACK_STR_RAW = 'I'
_IPV4_ADDRESS_PACK_STR_RAW = '4s'
_IPV4_ADDRESS_PACK_STR = '!' + _IPV4_ADDRESS_PACK_STR_RAW
_IPV4_ADDRESS_LEN = struct.calcsize(_IPV4_ADDRESS_PACK_STR)
_IPV6_ADDRESS_LEN = 16
@ -580,7 +579,8 @@ class vrrpv3(vrrp):
@staticmethod
def serialize_static(vrrp_, prev):
if isinstance(prev, ipv4.ipv4):
assert type(vrrp_.ip_addresses[0]) == int
assert type(vrrp_.ip_addresses[0]) == str
assert len(vrrp_.ip_addresses[0]) == 4
ip_address_pack_raw = vrrpv3._IPV4_ADDRESS_PACK_STR_RAW
elif isinstance(prev, ipv6.ipv6):
assert type(vrrp_.ip_addresses[0]) == str

View File

@ -26,11 +26,11 @@ class Test_ip(unittest.TestCase):
ipv4_str = '10.28.197.1'
val = 0x0a1cc501
res = ip.ipv4_to_bin(ipv4_str)
(res,) = struct.unpack('!I', ip.ipv4_to_bin(ipv4_str))
eq_(val, res)
def test_ipv4_to_str(self):
ipv4_bin = 0x0a1cc501
ipv4_bin = struct.pack('!I', 0x0a1cc501)
val = '10.28.197.1'
res = ip.ipv4_to_str(ipv4_bin)

View File

@ -43,9 +43,9 @@ class Test_arp(unittest.TestCase):
plen = 4
opcode = 1
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
src_ip = int(netaddr.IPAddress('24.166.172.1'))
src_ip = netaddr.IPAddress('24.166.172.1').packed
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
dst_ip = int(netaddr.IPAddress('24.166.173.159'))
dst_ip = netaddr.IPAddress('24.166.173.159').packed
fmt = arp._PACK_STR
buf = pack(fmt, hwtype, proto, hlen, plen, opcode, src_mac, src_ip,

View File

@ -50,8 +50,8 @@ class Test_ipv4(unittest.TestCase):
ttl = 64
proto = inet.IPPROTO_TCP
csum = 0xadc6
src = int(netaddr.IPAddress('131.151.32.21'))
dst = int(netaddr.IPAddress('131.151.32.129'))
src = netaddr.IPAddress('131.151.32.21').packed
dst = netaddr.IPAddress('131.151.32.129').packed
length = header_length * 4
option = '\x86\x28\x00\x00\x00\x01\x01\x22' \
+ '\x00\x01\xae\x00\x00\x00\x00\x00' \

View File

@ -36,10 +36,8 @@ class TestPacket(unittest.TestCase):
dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
dst_ip = int(netaddr.IPAddress('192.168.128.10'))
dst_ip_bin = struct.pack('!I', dst_ip)
src_ip = int(netaddr.IPAddress('192.168.122.20'))
src_ip_bin = struct.pack('!I', src_ip)
dst_ip_bin = dst_ip = netaddr.IPAddress('192.168.128.10').packed
src_ip_bin = src_ip = netaddr.IPAddress('192.168.122.20').packed
payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \
+ '\xcd\xc5\x00\x00\x00\x00\x00\x00' \
+ '\x10\x11\x12\x13\x14\x15\x16\x17' \
@ -262,7 +260,7 @@ class TestPacket(unittest.TestCase):
eq_(0x77b2, p_udp.csum)
t = bytearray(u_buf)
struct.pack_into('!H', t, 6, p_udp.csum)
ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
ph = struct.pack('!4s4sBBH', self.src_ip, self.dst_ip, 0,
17, len(u_buf) + len(self.payload))
t = ph + t + self.payload
eq_(packet_utils.checksum(t), 0)
@ -361,7 +359,7 @@ class TestPacket(unittest.TestCase):
eq_(len(t_buf), len(p_tcp))
t = bytearray(t_buf)
struct.pack_into('!H', t, 16, p_tcp.csum)
ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
ph = struct.pack('!4s4sBBH', self.src_ip, self.dst_ip, 0,
6, len(t_buf) + len(self.payload))
t = ph + t + self.payload
eq_(packet_utils.checksum(t), 0)

View File

@ -92,8 +92,8 @@ class Test_tcp(unittest.TestCase):
offset = 5
csum = 0
src_ip = int(netaddr.IPAddress('192.168.10.1'))
dst_ip = int(netaddr.IPAddress('192.168.100.1'))
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_TCP, 0, src_ip, dst_ip)
@ -112,7 +112,7 @@ class Test_tcp(unittest.TestCase):
eq_(res[8], self.urgent)
# checksum
ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 6, offset * 4)
ph = struct.pack('!4s4sBBH', src_ip, dst_ip, 0, 6, offset * 4)
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)
@ -122,8 +122,8 @@ class Test_tcp(unittest.TestCase):
csum = 0
option = '\x01\x02'
src_ip = int(netaddr.IPAddress('192.168.10.1'))
dst_ip = int(netaddr.IPAddress('192.168.100.1'))
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_TCP, 0, src_ip, dst_ip)

View File

@ -71,8 +71,8 @@ class Test_udp(unittest.TestCase):
total_length = 0
csum = 0
src_ip = int(netaddr.IPAddress('192.168.10.1'))
dst_ip = int(netaddr.IPAddress('192.168.100.1'))
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_UDP, 0, src_ip, dst_ip)
@ -85,7 +85,7 @@ class Test_udp(unittest.TestCase):
eq_(res[2], struct.calcsize(udp._PACK_STR))
# checksum
ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 17, res[2])
ph = struct.pack('!4s4sBBH', src_ip, dst_ip, 0, 17, res[2])
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)

View File

@ -100,8 +100,8 @@ class Test_vlan(unittest.TestCase):
ttl = 64
proto = inet.IPPROTO_ICMP
csum = 0xa7f2
src = int(netaddr.IPAddress('131.151.32.21'))
dst = int(netaddr.IPAddress('131.151.32.129'))
src = netaddr.IPAddress('131.151.32.21').packed
dst = netaddr.IPAddress('131.151.32.129').packed
option = 'TEST'
ip = ipv4(version, header_length, tos, total_length, identification,
flags, offset, ttl, proto, csum, src, dst, option)

View File

@ -46,11 +46,11 @@ class Test_vrrpv2(unittest.TestCase):
auth_type = vrrp.VRRP_AUTH_NO_AUTH
max_adver_int = 100
checksum = 0
ip_address = netaddr.IPAddress('192.168.0.1').value
ip_address = netaddr.IPAddress('192.168.0.1').packed
auth_data = (0, 0)
vrrpv2 = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
[ip_address])
buf = struct.pack(vrrp.vrrpv2._PACK_STR + 'III',
buf = struct.pack(vrrp.vrrpv2._PACK_STR + '4sII',
vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_),
vrid, priority, count_ip,
auth_type, max_adver_int, checksum, ip_address,
@ -84,12 +84,12 @@ class Test_vrrpv2(unittest.TestCase):
eq_(self.max_adver_int, vrrpv2.max_adver_int)
eq_(self.checksum, vrrpv2.checksum)
eq_(1, len(vrrpv2.ip_addresses))
eq_(int, type(vrrpv2.ip_addresses[0]))
eq_(str, type(vrrpv2.ip_addresses[0]))
eq_(self.ip_address, vrrpv2.ip_addresses[0])
eq_(self.auth_data, vrrpv2.auth_data)
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').value
src_ip = netaddr.IPAddress('192.168.0.1').packed
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
@ -98,14 +98,14 @@ class Test_vrrpv2(unittest.TestCase):
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').value
ip_address = netaddr.IPAddress('192.168.0.2').packed
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv2.create(
type_, vrid, priority, max_adver_int, ip_addresses)
buf = vrrp_.serialize(bytearray(), prev)
pack_str = vrrp.vrrpv2._PACK_STR + 'III'
pack_str = vrrp.vrrpv2._PACK_STR + '4sII'
pack_len = struct.calcsize(pack_str)
res = struct.unpack(pack_str, str(buf))
eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
@ -130,7 +130,7 @@ class Test_vrrpv2(unittest.TestCase):
vrrp.vrrp.parser(m_short_buf)
def test_create_packet(self):
primary_ip = netaddr.IPAddress('192.168.0.2').value
primary_ip = netaddr.IPAddress('192.168.0.2').packed
p0 = self.vrrpv2.create_packet(primary_ip)
p0.serialize()
p1 = packet.Packet(str(p0.data))
@ -193,10 +193,10 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
count_ip = 1
max_adver_int = 111
checksum = 0
ip_address = netaddr.IPAddress('192.168.0.1').value
ip_address = netaddr.IPAddress('192.168.0.1').packed
vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
[ip_address])
buf = struct.pack(vrrp.vrrpv3._PACK_STR + 'I',
buf = struct.pack(vrrp.vrrpv3._PACK_STR + '4s',
vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
vrid, priority, count_ip,
max_adver_int, checksum, ip_address)
@ -226,11 +226,11 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
eq_(self.max_adver_int, vrrpv3.max_adver_int)
eq_(self.checksum, vrrpv3.checksum)
eq_(1, len(vrrpv3.ip_addresses))
eq_(int, type(vrrpv3.ip_addresses[0]))
eq_(str, type(vrrpv3.ip_addresses[0]))
eq_(self.ip_address, vrrpv3.ip_addresses[0])
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').value
src_ip = netaddr.IPAddress('192.168.0.1').packed
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
@ -239,7 +239,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').value
ip_address = netaddr.IPAddress('192.168.0.2').packed
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv3.create(
@ -247,7 +247,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
buf = vrrp_.serialize(bytearray(), prev)
print(len(buf), type(buf), buf)
pack_str = vrrp.vrrpv3._PACK_STR + 'I'
pack_str = vrrp.vrrpv3._PACK_STR + '4s'
pack_len = struct.calcsize(pack_str)
res = struct.unpack(pack_str, str(buf))
eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_))
@ -261,7 +261,8 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
print(res)
# checksum
ph = struct.pack('!IIxBH', src_ip, dst_ip, inet.IPPROTO_VRRP, pack_len)
ph = struct.pack('!4s4sxBH', src_ip, dst_ip, inet.IPPROTO_VRRP,
pack_len)
s = packet_utils.checksum(ph + buf)
eq_(0, s)
@ -271,7 +272,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
vrrp.vrrp.parser(m_short_buf)
def test_create_packet(self):
primary_ip = netaddr.IPAddress('192.168.0.2').value
primary_ip = netaddr.IPAddress('192.168.0.2').packed
p0 = self.vrrpv3.create_packet(primary_ip)
p0.serialize()
p1 = packet.Packet(str(p0.data))