packet lib: change the api to use text addresses

for example:
    >>> from ryu.lib.packet.ipv4 import ipv4
    >>> o = ipv4(src='127.0.0.1')
    >>> o.src
    '127.0.0.1'
    >>>

i left lldp TLVs as they seem to be treated opaque.

for now, i don't change mac.DONTCARE and mac.BROADCAST because
they are used by the ofproto world as well.

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
YAMAMOTO Takashi 2013-08-01 13:54:57 +09:00 committed by FUJITA Tomonori
parent 84236ab88d
commit c14a4182d3
22 changed files with 242 additions and 189 deletions

View File

@ -23,8 +23,10 @@ HADDR_PATTERN = r'([0-9a-f]{2}:){5}[0-9a-f]{2}'
DONTCARE = '\x00' * 6
BROADCAST = '\xff' * 6
MULTICAST = '\xfe' + '\xff' * 5
UNICAST = '\x01' + '\x00' * 5
DONTCARE_STR = '00:00:00:00:00:00'
BROADCAST_STR = 'ff:ff:ff:ff:ff:ff'
MULTICAST = 'fe:ff:ff:ff:ff:ff'
UNICAST = '01:00:00:00:00:00'
def is_multicast(addr):

View File

@ -16,8 +16,7 @@
import struct
from ryu.ofproto import ether
from ryu.lib import ip
from ryu.lib import mac
from ryu.lib import addrconv
from . import packet_base
ARP_HW_TYPE_ETHERNET = 1 # ethernet hardware type
@ -56,10 +55,10 @@ class arp(packet_base.PacketBase):
def __init__(self, hwtype=ARP_HW_TYPE_ETHERNET, proto=ether.ETH_TYPE_IP,
hlen=6, plen=4, opcode=ARP_REQUEST,
src_mac=mac.haddr_to_bin('ff:ff:ff:ff:ff:ff'),
src_ip=ip.ipv4_to_bin('0.0.0.0'),
dst_mac=mac.haddr_to_bin('ff:ff:ff:ff:ff:ff'),
dst_ip=ip.ipv4_to_bin('0.0.0.0')):
src_mac='ff:ff:ff:ff:ff:ff',
src_ip='0.0.0.0',
dst_mac='ff:ff:ff:ff:ff:ff',
dst_ip='0.0.0.0'):
super(arp, self).__init__()
self.hwtype = hwtype
self.proto = proto
@ -75,14 +74,19 @@ class arp(packet_base.PacketBase):
def parser(cls, buf):
(hwtype, proto, hlen, plen, opcode, src_mac, src_ip,
dst_mac, dst_ip) = struct.unpack_from(cls._PACK_STR, buf)
return cls(hwtype, proto, hlen, plen, opcode, src_mac, src_ip,
dst_mac, dst_ip), None, buf[arp._MIN_LEN:]
return cls(hwtype, proto, hlen, plen, opcode,
addrconv.mac.bin_to_text(src_mac),
addrconv.ipv4.bin_to_text(src_ip),
addrconv.mac.bin_to_text(dst_mac),
addrconv.ipv4.bin_to_text(dst_ip)), None, buf[arp._MIN_LEN:]
def serialize(self, payload, prev):
return struct.pack(arp._PACK_STR, self.hwtype, self.proto,
self.hlen, self.plen, self.opcode,
self.src_mac, self.src_ip, self.dst_mac,
self.dst_ip)
addrconv.mac.text_to_bin(self.src_mac),
addrconv.ipv4.text_to_bin(self.src_ip),
addrconv.mac.text_to_bin(self.dst_mac),
addrconv.ipv4.text_to_bin(self.dst_ip))
def arp_ip(opcode, src_mac, src_ip, dst_mac, dst_ip):

View File

@ -130,7 +130,7 @@ Rapid Spanning Tree BPDUs(RST BPDUs) format
import binascii
import struct
from . import packet_base
from ryu.lib.mac import haddr_to_bin
from ryu.lib import addrconv
# BPDU destination
@ -243,10 +243,10 @@ class ConfigurationBPDUs(bpdu):
def __init__(self, flags=0, root_priority=DEFAULT_BRIDGE_PRIORITY,
root_system_id_extension=0,
root_mac_address=haddr_to_bin('00:00:00:00:00:00'),
root_mac_address='00:00:00:00:00:00',
root_path_cost=0, bridge_priority=DEFAULT_BRIDGE_PRIORITY,
bridge_system_id_extension=0,
bridge_mac_address=haddr_to_bin('00:00:00:00:00:00'),
bridge_mac_address='00:00:00:00:00:00',
port_priority=DEFAULT_PORT_PRIORITY, port_number=0,
message_age=0, max_age=DEFAULT_MAX_AGE,
hello_time=DEFAULT_HELLO_TIME,
@ -336,13 +336,15 @@ class ConfigurationBPDUs(bpdu):
mac_addr_list = [format((mac_addr >> (8 * i)) & 0xff, '02x')
for i in range(0, 6)]
mac_addr_list.reverse()
mac_address = binascii.a2b_hex(''.join(mac_addr_list))
mac_address_bin = binascii.a2b_hex(''.join(mac_addr_list))
mac_address = addrconv.mac.bin_to_text(mac_address_bin)
return priority, system_id_extension, mac_address
@staticmethod
def encode_bridge_id(priority, system_id_extension, mac_address):
mac_addr = int(binascii.hexlify(mac_address), 16)
mac_addr = int(binascii.hexlify(addrconv.mac.text_to_bin(mac_address)),
16)
return ((priority + system_id_extension) << 48) + mac_addr
@staticmethod
@ -429,10 +431,10 @@ class RstBPDUs(ConfigurationBPDUs):
def __init__(self, flags=0, root_priority=DEFAULT_BRIDGE_PRIORITY,
root_system_id_extension=0,
root_mac_address=haddr_to_bin('00:00:00:00:00:00'),
root_mac_address='00:00:00:00:00:00',
root_path_cost=0, bridge_priority=DEFAULT_BRIDGE_PRIORITY,
bridge_system_id_extension=0,
bridge_mac_address=haddr_to_bin('00:00:00:00:00:00'),
bridge_mac_address='00:00:00:00:00:00',
port_priority=DEFAULT_PORT_PRIORITY, port_number=0,
message_age=0, max_age=DEFAULT_MAX_AGE,
hello_time=DEFAULT_HELLO_TIME,

View File

@ -18,6 +18,7 @@ from . import packet_base
from . import vlan
from . import mpls
from ryu.ofproto import ether
from ryu.lib import addrconv
class ethernet(packet_base.PacketBase):
@ -47,11 +48,15 @@ class ethernet(packet_base.PacketBase):
@classmethod
def parser(cls, buf):
dst, src, ethertype = struct.unpack_from(cls._PACK_STR, buf)
return (cls(dst, src, ethertype), ethernet.get_packet_type(ethertype),
return (cls(addrconv.mac.bin_to_text(dst),
addrconv.mac.bin_to_text(src), ethertype),
ethernet.get_packet_type(ethertype),
buf[ethernet._MIN_LEN:])
def serialize(self, payload, prev):
return struct.pack(ethernet._PACK_STR, self.dst, self.src,
return struct.pack(ethernet._PACK_STR,
addrconv.mac.text_to_bin(self.dst),
addrconv.mac.text_to_bin(self.src),
self.ethertype)
@classmethod

View File

@ -20,7 +20,7 @@ import binascii
from . import packet_base
from . import packet_utils
from ryu.lib.mac import haddr_to_bin, haddr_to_str
from ryu.lib import addrconv
ICMPV6_DST_UNREACH = 1 # dest unreachable, codes:
ICMPV6_PACKET_TOO_BIG = 2 # packet too big
@ -181,7 +181,7 @@ class nd_neighbor(object):
@classmethod
def parser(cls, buf, offset):
(res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(res >> 29, dst)
msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst))
offset += cls._MIN_LEN
if len(buf) > offset:
(msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
@ -195,7 +195,8 @@ class nd_neighbor(object):
return msg
def serialize(self):
hdr = bytearray(struct.pack(nd_neighbor._PACK_STR, self.res, self.dst))
hdr = bytearray(struct.pack(nd_neighbor._PACK_STR, self.res,
addrconv.ipv6.text_to_bin(self.dst)))
if self.type_ is not None:
hdr += bytearray(struct.pack('!BB', self.type_, self.length))
@ -244,7 +245,7 @@ class nd_option_la(object):
@classmethod
def parser(cls, buf, offset):
(hw_src, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(hw_src)
msg = cls(addrconv.mac.bin_to_text(hw_src))
offset += cls._MIN_LEN
if len(buf) > offset:
msg.data = buf[offset:]
@ -252,7 +253,8 @@ class nd_option_la(object):
return msg
def serialize(self):
hdr = bytearray(struct.pack(self._PACK_STR, self.hw_src))
hdr = bytearray(struct.pack(self._PACK_STR,
addrconv.mac.text_to_bin(self.hw_src)))
if self.data is not None:
hdr += bytearray(self.data)

View File

@ -21,7 +21,7 @@ from . import icmp
from . import udp
from . import tcp
from ryu.ofproto import inet
from ryu.lib import ip
from ryu.lib import addrconv
IPV4_ADDRESS_PACK_STR = '!I'
@ -68,8 +68,8 @@ class ipv4(packet_base.PacketBase):
def __init__(self, version=4, header_length=5, tos=0,
total_length=0, identification=0, flags=0,
offset=0, ttl=255, proto=0, csum=0,
src=ip.ipv4_to_bin('0.0.0.0'),
dst=ip.ipv4_to_bin('0.0.0.0'),
src='0.0.0.0',
dst='0.0.0.0',
option=None):
super(ipv4, self).__init__()
self.version = version
@ -103,7 +103,9 @@ class ipv4(packet_base.PacketBase):
else:
option = None
msg = cls(version, header_length, tos, total_length, identification,
flags, offset, ttl, proto, csum, src, dst, option)
flags, offset, ttl, proto, csum,
addrconv.ipv4.bin_to_text(src),
addrconv.ipv4.bin_to_text(dst), option)
return msg, ipv4.get_packet_type(proto), buf[length:total_length]
@ -116,7 +118,9 @@ class ipv4(packet_base.PacketBase):
self.total_length = self.header_length * 4 + len(payload)
struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos,
self.total_length, self.identification, flags,
self.ttl, self.proto, 0, self.src, self.dst)
self.ttl, self.proto, 0,
addrconv.ipv4.text_to_bin(self.src),
addrconv.ipv4.text_to_bin(self.dst))
if self.option:
assert (length - ipv4._MIN_LEN) >= len(self.option)

View File

@ -20,6 +20,7 @@ from . import packet_utils
from . import icmpv6
from . import tcp
from ryu.ofproto import inet
from ryu.lib import addrconv
IPV6_ADDRESS_PACK_STR = '!16s'
@ -73,7 +74,8 @@ class ipv6(packet_base.PacketBase):
flow_label = v_tc_flow & 0xfffff
hop_limit = hlim
msg = cls(version, traffic_class, flow_label, payload_length,
nxt, hop_limit, src, dst)
nxt, hop_limit, addrconv.ipv6.bin_to_text(src),
addrconv.ipv6.bin_to_text(dst))
return msg, ipv6.get_packet_type(nxt), buf[cls._MIN_LEN:payload_length]
def serialize(self, payload, prev):
@ -82,7 +84,8 @@ class ipv6(packet_base.PacketBase):
self.flow_label << 12)
struct.pack_into(ipv6._PACK_STR, hdr, 0, v_tc_flow,
self.payload_length, self.nxt, self.hop_limit,
self.src, self.dst)
addrconv.ipv6.text_to_bin(self.src),
addrconv.ipv6.text_to_bin(self.dst))
return hdr
ipv6.register_packet_type(icmpv6.icmpv6, inet.IPPROTO_ICMPV6)

View File

@ -45,9 +45,9 @@ from ryu.lib.packet import packet_base
# LLDP destination MAC address
LLDP_MAC_NEAREST_BRIDGE = '\x01\x80\xc2\x00\x00\x0e'
LLDP_MAC_NEAREST_NON_TPMR_BRIDGE = '\x01\x80\xc2\x00\x00\x03'
LLDP_MAC_NEAREST_CUSTOMER_BRIDGE = '\x01\x80\xc2\x00\x00\x00'
LLDP_MAC_NEAREST_BRIDGE = '01:80:c2:00:00:0e'
LLDP_MAC_NEAREST_NON_TPMR_BRIDGE = '01:80:c2:00:00:03'
LLDP_MAC_NEAREST_CUSTOMER_BRIDGE = '01:80:c2:00:00:00'
LLDP_TLV_TYPELEN_STR = '!H'

View File

@ -16,6 +16,7 @@
import array
import socket
import struct
from ryu.lib import addrconv
def carry_around_add(a, b):
@ -83,10 +84,14 @@ def checksum_ip(ipvx, length, payload):
"""
if ipvx.version == 4:
header = struct.pack(_IPV4_PSEUDO_HEADER_PACK_STR,
ipvx.src, ipvx.dst, ipvx.proto, length)
addrconv.ipv4.text_to_bin(ipvx.src),
addrconv.ipv4.text_to_bin(ipvx.dst),
ipvx.proto, length)
elif ipvx.version == 6:
header = struct.pack(_IPV6_PSEUDO_HEADER_PACK_STR,
ipvx.src, ipvx.dst, length, ipvx.nxt)
addrconv.ipv6.text_to_bin(ipvx.src),
addrconv.ipv6.text_to_bin(ipvx.dst),
length, ipvx.nxt)
else:
raise ValueError('Unknown IP version %d' % ipvx.version)

View File

@ -15,10 +15,10 @@
import struct
from . import packet_base
from ryu.lib import mac
from ryu.lib import addrconv
# Slow Protocol Multicast destination
SLOW_PROTOCOL_MULTICAST = '\x01\x80\xc2\x00\x00\x02'
SLOW_PROTOCOL_MULTICAST = '01:80:c2:00:00:02'
# Slow Protocol SubType
SLOW_SUBTYPE_LACP = 0x01
@ -368,7 +368,7 @@ class lacp(packet_base.PacketBase):
def __init__(self, version=LACP_VERSION_NUMBER,
actor_system_priority=0,
actor_system=mac.haddr_to_bin('00:00:00:00:00:00'),
actor_system='00:00:00:00:00:00',
actor_key=0, actor_port_priority=0, actor_port=0,
actor_state_activity=0, actor_state_timeout=0,
actor_state_aggregation=0,
@ -376,7 +376,7 @@ class lacp(packet_base.PacketBase):
actor_state_collecting=0, actor_state_distributing=0,
actor_state_defaulted=0, actor_state_expired=0,
partner_system_priority=0,
partner_system=mac.haddr_to_bin('00:00:00:00:00:00'),
partner_system='00:00:00:00:00:00',
partner_key=0, partner_port_priority=0, partner_port=0,
partner_state_activity=0, partner_state_timeout=0,
partner_state_aggregation=0,
@ -537,14 +537,17 @@ class lacp(packet_base.PacketBase):
) = struct.unpack_from(cls._TRM_PACK_STR, buf, offset)
assert cls.LACP_TLV_TYPE_TERMINATOR == terminator_tag
assert 0 == terminator_length
return cls(version, actor_system_priority,
actor_system, actor_key, actor_port_priority,
return cls(version,
actor_system_priority,
addrconv.mac.bin_to_text(actor_system),
actor_key, actor_port_priority,
actor_port, actor_state_activity,
actor_state_timeout, actor_state_aggregation,
actor_state_synchronization, actor_state_collecting,
actor_state_distributing, actor_state_defaulted,
actor_state_expired, partner_system_priority,
partner_system, partner_key, partner_port_priority,
addrconv.mac.bin_to_text(partner_system),
partner_key, partner_port_priority,
partner_port, partner_state_activity,
partner_state_timeout, partner_state_aggregation,
partner_state_synchronization,
@ -558,13 +561,15 @@ class lacp(packet_base.PacketBase):
actor = struct.pack(self._ACTPRT_INFO_PACK_STR,
self.actor_tag, self.actor_length,
self.actor_system_priority,
self.actor_system, self.actor_key,
addrconv.mac.text_to_bin(self.actor_system),
self.actor_key,
self.actor_port_priority, self.actor_port,
self.actor_state)
partner = struct.pack(self._ACTPRT_INFO_PACK_STR,
self.partner_tag, self.partner_length,
self.partner_system_priority,
self.partner_system, self.partner_key,
addrconv.mac.text_to_bin(self.partner_system),
self.partner_key,
self.partner_port_priority,
self.partner_port, self.partner_state)
collector = struct.pack(self._COL_INFO_PACK_STR,

View File

@ -67,7 +67,6 @@ VRRP v3 packet format
"""
import netaddr
import struct
from ryu.lib.packet import ethernet
@ -79,36 +78,31 @@ from ryu.lib.packet import packet_utils
from ryu.lib.packet import vlan
from ryu.ofproto import ether
from ryu.ofproto import inet
from ryu.lib import addrconv
# IPv4
# the LSB 8 bits is used for VRID
VRRP_IPV4_SRC_MAC_ADDRESS_STR = '00:00:5E:00:01:00'
VRRP_IPV4_SRC_MAC_ADDRESS = netaddr.EUI(VRRP_IPV4_SRC_MAC_ADDRESS_STR).packed
VRRP_IPV4_DST_MAC_ADDRESS_STR = '01:00:5E:00:00:12'
VRRP_IPV4_DST_MAC_ADDRESS = netaddr.EUI(VRRP_IPV4_DST_MAC_ADDRESS_STR).packed
VRRP_IPV4_DST_ADDRESS_STR = '224.0.0.18'
VRRP_IPV4_DST_ADDRESS = netaddr.IPAddress(VRRP_IPV4_DST_ADDRESS_STR).packed
VRRP_IPV4_SRC_MAC_ADDRESS_FMT = '00:00:5E:00:01:%02x'
VRRP_IPV4_DST_MAC_ADDRESS = '01:00:5E:00:00:12'
VRRP_IPV4_DST_ADDRESS = '224.0.0.18'
VRRP_IPV4_TTL = 255
def vrrp_ipv4_src_mac_address(vrid):
return VRRP_IPV4_SRC_MAC_ADDRESS[:-1] + chr(vrid)
return VRRP_IPV4_SRC_MAC_ADDRESS_FMT % vrid
# IPv6
# the LSB 8 bits is used for VRID
VRRP_IPV6_SRC_MAC_ADDRESS_STR = '00:00:5E:00:02:00'
VRRP_IPV6_SRC_MAC_ADDRESS = netaddr.EUI(VRRP_IPV6_SRC_MAC_ADDRESS_STR).packed
VRRP_IPV6_DST_MAC_ADDRESS_STR = '33:33:00:00:00:12'
VRRP_IPV6_DST_MAC_ADDRESS = netaddr.EUI(VRRP_IPV6_DST_MAC_ADDRESS_STR).packed
VRRP_IPV6_DST_ADDRESS_STR = 'FF02:0:0:0:0:0:0:12'
VRRP_IPV6_DST_ADDRESS = netaddr.IPAddress(VRRP_IPV6_DST_ADDRESS_STR).packed
VRRP_IPV6_SRC_MAC_ADDRESS_FMT = '00:00:5E:00:02:%02x'
VRRP_IPV6_DST_MAC_ADDRESS = '33:33:00:00:00:12'
VRRP_IPV6_DST_ADDRESS = 'ff02::12'
VRRP_IPV6_HOP_LIMIT = 255
def vrrp_ipv6_src_mac_address(vrid):
return VRRP_IPV6_SRC_MAC_ADDRESS[:-1] + chr(vrid)
return VRRP_IPV6_SRC_MAC_ADDRESS_FMT % vrid
VRRP_VERSION_SHIFT = 4
@ -167,10 +161,12 @@ VRRP_V2_MAX_ADVER_INT_MAX = 0xff
def is_ipv6(ip_address):
assert type(ip_address) == str
if len(ip_address) == 4:
return False
assert len(ip_address) == 16
try:
addrconv.ipv4.text_to_bin(ip_address)
except:
addrconv.ipv6.text_to_bin(ip_address) # sanity
return True
return False
class vrrp(packet_base.PacketBase):
@ -449,7 +445,10 @@ class vrrpv2(vrrp):
offset = cls._MIN_LEN
ip_addresses_pack_str = cls._ip_addresses_pack_str(count_ip)
ip_addresses = struct.unpack_from(ip_addresses_pack_str, buf, offset)
ip_addresses_bin = struct.unpack_from(ip_addresses_pack_str, buf,
offset)
ip_addresses = map(lambda x: addrconv.ipv4.bin_to_text(x),
ip_addresses_bin)
offset += struct.calcsize(ip_addresses_pack_str)
auth_data = struct.unpack_from(cls._AUTH_DATA_PACK_STR, buf, offset)
@ -484,7 +483,8 @@ class vrrpv2(vrrp):
vrrp_.checksum)
offset += vrrpv2._MIN_LEN
struct.pack_into(ip_addresses_pack_str, buf, offset,
*vrrp_.ip_addresses)
*map(lambda x: addrconv.ipv4.text_to_bin(x),
vrrp_.ip_addresses))
offset += ip_addresses_len
struct.pack_into(vrrpv2._AUTH_DATA_PACK_STR, buf, offset,
*vrrp_.auth_data)
@ -564,14 +564,17 @@ class vrrpv3(vrrp):
# Unfortunately it isn't available. Guess it by vrrp packet length.
if address_len == cls._IPV4_ADDRESS_LEN:
pack_str = '!' + cls._IPV4_ADDRESS_PACK_STR_RAW * count_ip
conv = addrconv.ipv4.bin_to_text
elif address_len == cls._IPV6_ADDRESS_LEN:
pack_str = '!' + cls._IPV6_ADDRESS_PACK_STR_RAW * count_ip
conv = addrconv.ipv6.bin_to_text
else:
raise ValueError(
'unknown address version address_len %d count_ip %d' % (
address_len, count_ip))
ip_addresses = struct.unpack_from(pack_str, buf, offset)
ip_addresses_bin = struct.unpack_from(pack_str, buf, offset)
ip_addresses = map(lambda x: conv(x), ip_addresses_bin)
msg = cls(version, type_, vrid, priority,
count_ip, max_adver_int, checksum, ip_addresses)
return msg, None, buf[len(msg):]
@ -580,11 +583,11 @@ class vrrpv3(vrrp):
def serialize_static(vrrp_, prev):
if isinstance(prev, ipv4.ipv4):
assert type(vrrp_.ip_addresses[0]) == str
assert len(vrrp_.ip_addresses[0]) == 4
conv = addrconv.ipv4.text_to_bin
ip_address_pack_raw = vrrpv3._IPV4_ADDRESS_PACK_STR_RAW
elif isinstance(prev, ipv6.ipv6):
assert type(vrrp_.ip_addresses[0]) == str
assert len(vrrp_.ip_addresses[0]) == vrrp._IPV6_ADDRESS_LEN
conv = addrconv.ipv6.text_to_bin
ip_address_pack_raw = vrrpv3._IPV6_ADDRESS_PACK_STR_RAW
else:
raise ValueError('Unkown network layer %s' % type(prev))
@ -605,7 +608,7 @@ class vrrpv3(vrrp):
vrrp_.vrid, vrrp_.priority,
vrrp_.count_ip, vrrp_.max_adver_int, vrrp_.checksum)
struct.pack_into(ip_addresses_pack_str, buf, vrrpv3._MIN_LEN,
*vrrp_.ip_addresses)
*map(lambda x: conv(x), vrrp_.ip_addresses))
if checksum:
vrrp_.checksum = packet_utils.checksum_ip(prev, len(buf), buf)

View File

@ -18,16 +18,15 @@
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.arp import arp
from ryu.lib.packet.vlan import vlan
from ryu.lib import addrconv
LOG = logging.getLogger('test_arp')
@ -42,14 +41,17 @@ class Test_arp(unittest.TestCase):
hlen = 6
plen = 4
opcode = 1
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
src_ip = netaddr.IPAddress('24.166.172.1').packed
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
dst_ip = netaddr.IPAddress('24.166.173.159').packed
src_mac = '00:07:0d:af:f4:54'
src_ip = '24.166.172.1'
dst_mac = '00:00:00:00:00:00'
dst_ip = '24.166.173.159'
fmt = arp._PACK_STR
buf = pack(fmt, hwtype, proto, hlen, plen, opcode, src_mac, src_ip,
dst_mac, dst_ip)
buf = pack(fmt, hwtype, proto, hlen, plen, opcode,
addrconv.mac.text_to_bin(src_mac),
addrconv.ipv4.text_to_bin(src_ip),
addrconv.mac.text_to_bin(dst_mac),
addrconv.ipv4.text_to_bin(dst_ip))
a = arp(hwtype, proto, hlen, plen, opcode, src_mac, src_ip, dst_mac,
dst_ip)
@ -106,10 +108,10 @@ class Test_arp(unittest.TestCase):
eq_(res[2], self.hlen)
eq_(res[3], self.plen)
eq_(res[4], self.opcode)
eq_(res[5], self.src_mac)
eq_(res[6], self.src_ip)
eq_(res[7], self.dst_mac)
eq_(res[8], self.dst_ip)
eq_(addrconv.mac.bin_to_text(res[5]), self.src_mac)
eq_(addrconv.ipv4.bin_to_text(res[6]), self.src_ip)
eq_(addrconv.mac.bin_to_text(res[7]), self.dst_mac)
eq_(addrconv.ipv4.bin_to_text(res[8]), self.dst_ip)
def _build_arp(self, vlan_enabled):
if vlan_enabled is True:

View File

@ -23,10 +23,10 @@ from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.arp import arp
from ryu.lib import addrconv
LOG = logging.getLogger('test_ethernet')
@ -36,11 +36,13 @@ class Test_ethernet(unittest.TestCase):
""" Test case for ethernet
"""
dst = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
src = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
dst = 'aa:aa:aa:aa:aa:aa'
src = 'bb:bb:bb:bb:bb:bb'
ethertype = ether.ETH_TYPE_ARP
buf = pack(ethernet._PACK_STR, dst, src, ethertype)
buf = pack(ethernet._PACK_STR,
addrconv.mac.text_to_bin(dst),
addrconv.mac.text_to_bin(src), ethertype)
e = ethernet(dst, src, ethertype)
@ -77,8 +79,8 @@ class Test_ethernet(unittest.TestCase):
fmt = ethernet._PACK_STR
res = struct.unpack(fmt, buf)
eq_(res[0], self.dst)
eq_(res[1], self.src)
eq_(addrconv.mac.bin_to_text(res[0]), self.dst)
eq_(addrconv.mac.bin_to_text(res[1]), self.src)
eq_(res[2], self.ethertype)
@raises(Exception)

View File

@ -18,7 +18,6 @@
import unittest
import logging
import struct
import netaddr
from nose.tools import ok_, eq_, nottest, raises
from nose.plugins.skip import Skip, SkipTest
@ -28,13 +27,16 @@ from ryu.lib.packet.packet import Packet
from ryu.lib.packet import icmpv6
from ryu.lib.packet.ipv6 import ipv6
from ryu.lib.packet import packet_utils
from ryu.lib import addrconv
LOG = logging.getLogger(__name__)
def icmpv6_csum(prev, buf):
ph = struct.pack('!16s16sI3xB', prev.src, prev.dst,
ph = struct.pack('!16s16sI3xB',
addrconv.ipv6.text_to_bin(prev.src),
addrconv.ipv6.text_to_bin(prev.dst),
prev.payload_length, prev.nxt)
h = bytearray(buf)
struct.pack_into('!H', h, 2, 0)
@ -70,8 +72,8 @@ class Test_icmpv6_header(unittest.TestCase):
eq_(n, None)
def test_serialize(self):
src_ipv6 = netaddr.IPAddress('fe80::200:ff:fe00:ef').packed
dst_ipv6 = netaddr.IPAddress('fe80::200:ff:fe00:1').packed
src_ipv6 = 'fe80::200:ff:fe00:ef'
dst_ipv6 = 'fe80::200:ff:fe00:1'
prev = ipv6(6, 0, 0, 4, 58, 255, src_ipv6, dst_ipv6)
buf = self.icmp.serialize(bytearray(), prev)
@ -128,8 +130,8 @@ class Test_icmpv6_echo_request(unittest.TestCase):
def _test_serialize(self, echo_data=None):
buf = self.buf + str(echo_data or '')
src_ipv6 = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed
dst_ipv6 = netaddr.IPAddress('3ffe:501:0:1001::2').packed
src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
dst_ipv6 = '3ffe:501:0:1001::2'
prev = ipv6(6, 0, 0, len(buf), 64, 255, src_ipv6, dst_ipv6)
echo_csum = icmpv6_csum(prev, buf)
@ -168,16 +170,16 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
code = 0
csum = 0x952d
res = 0
dst = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed
dst = '3ffe:507:0:1:200:86ff:fe05:80da'
nd_type = 1
nd_length = 1
nd_hw_src = '\x00\x60\x97\x07\x69\xea'
nd_hw_src = '00:60:97:07:69:ea'
data = '\x01\x01\x00\x60\x97\x07\x69\xea'
buf = '\x87\x00\x95\x2d\x00\x00\x00\x00' \
+ '\x3f\xfe\x05\x07\x00\x00\x00\x01' \
+ '\x02\x00\x86\xff\xfe\x05\x80\xda'
src_ipv6 = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed
dst_ipv6 = netaddr.IPAddress('3ffe:501:0:1001::2').packed
src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
dst_ipv6 = '3ffe:501:0:1001::2'
def setUp(self):
pass
@ -232,7 +234,7 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(code, self.code)
eq_(csum, nd_csum)
eq_(res >> 29, self.res)
eq_(dst, self.dst)
eq_(addrconv.ipv6.bin_to_text(dst), self.dst)
eq_(data, '')
def test_serialize_with_data(self):
@ -255,10 +257,10 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(code, self.code)
eq_(csum, nd_csum)
eq_(res >> 29, self.res)
eq_(dst, self.dst)
eq_(addrconv.ipv6.bin_to_text(dst), self.dst)
eq_(nd_type, self.nd_type)
eq_(nd_length, self.nd_length)
eq_(nd_hw_src, self.nd_hw_src)
eq_(addrconv.mac.bin_to_text(nd_hw_src), self.nd_hw_src)
class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
@ -266,11 +268,11 @@ class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
self.type_ = 136
self.csum = 0xb8ba
self.res = 7
self.dst = netaddr.IPAddress('3ffe:507:0:1:260:97ff:fe07:69ea').packed
self.dst = '3ffe:507:0:1:260:97ff:fe07:69ea'
self.nd_type = 2
self.nd_length = 1
self.nd_data = None
self.nd_hw_src = '\x00\x60\x97\x07\x69\xea'
self.nd_hw_src = '00:60:97:07:69:ea'
self.data = '\x02\x01\x00\x60\x97\x07\x69\xea'
self.buf = '\x88\x00\xb8\xba\xe0\x00\x00\x00' \
+ '\x3f\xfe\x05\x07\x00\x00\x00\x01' \
@ -316,8 +318,8 @@ class Test_icmpv6_router_solict(unittest.TestCase):
def _test_serialize(self, nd_data=None):
nd_data = str(nd_data or '')
buf = self.buf + nd_data
src_ipv6 = netaddr.IPAddress('fe80::102d:a5ff:fe6d:bc0f').packed
dst_ipv6 = netaddr.IPAddress('ff02::2').packed
src_ipv6 = 'fe80::102d:a5ff:fe6d:bc0f'
dst_ipv6 = 'ff02::2'
prev = ipv6(6, 0, 0, len(buf), 58, 255, src_ipv6, dst_ipv6)
nd_csum = icmpv6_csum(prev, buf)

View File

@ -22,13 +22,12 @@ from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet import packet_utils
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.ipv4 import ipv4
from ryu.lib.packet.tcp import tcp
import netaddr
from ryu.lib import addrconv
LOG = logging.getLogger('test_ipv4')
@ -50,8 +49,8 @@ class Test_ipv4(unittest.TestCase):
ttl = 64
proto = inet.IPPROTO_TCP
csum = 0xadc6
src = netaddr.IPAddress('131.151.32.21').packed
dst = netaddr.IPAddress('131.151.32.129').packed
src = '131.151.32.21'
dst = '131.151.32.129'
length = header_length * 4
option = '\x86\x28\x00\x00\x00\x01\x01\x22' \
+ '\x00\x01\xae\x00\x00\x00\x00\x00' \
@ -60,7 +59,9 @@ class Test_ipv4(unittest.TestCase):
+ '\x00\x00\x00\x00\x00\x00\x00\x01'
buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification,
flg_off, ttl, proto, csum, src, dst) \
flg_off, ttl, proto, csum,
addrconv.ipv4.text_to_bin(src),
addrconv.ipv4.text_to_bin(dst)) \
+ option
ip = ipv4(version, header_length, tos, total_length, identification,
@ -117,8 +118,8 @@ class Test_ipv4(unittest.TestCase):
eq_(res[4], self.flg_off)
eq_(res[5], self.ttl)
eq_(res[6], self.proto)
eq_(res[8], self.src)
eq_(res[9], self.dst)
eq_(addrconv.ipv4.bin_to_text(res[8]), self.src)
eq_(addrconv.ipv4.bin_to_text(res[9]), self.dst)
eq_(option, self.option)
# checksum

View File

@ -24,6 +24,7 @@ from ryu.ofproto import ether
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import lldp
from ryu.lib import addrconv
LOG = logging.getLogger(__name__)
@ -98,13 +99,14 @@ class TestLLDPMandatoryTLV(unittest.TestCase):
pkt = packet.Packet()
dst = lldp.LLDP_MAC_NEAREST_BRIDGE
src = '\x00\x04\x96\x1f\xa7\x26'
src = '00:04:96:1f:a7:26'
ethertype = ether.ETH_TYPE_LLDP
eth_pkt = ethernet.ethernet(dst, src, ethertype)
pkt.add_protocol(eth_pkt)
tlv_chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id=src)
chassis_id=addrconv.mac.
text_to_bin(src))
tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/3')
tlv_ttl = lldp.TTL(ttl=120)
@ -216,13 +218,14 @@ class TestLLDPOptionalTLV(unittest.TestCase):
pkt = packet.Packet()
dst = lldp.LLDP_MAC_NEAREST_BRIDGE
src = '\x00\x01\x30\xf9\xad\xa0'
src = '00:01:30:f9:ad:a0'
ethertype = ether.ETH_TYPE_LLDP
eth_pkt = ethernet.ethernet(dst, src, ethertype)
pkt.add_protocol(eth_pkt)
tlv_chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id=src)
chassis_id=addrconv.mac.
text_to_bin(src))
tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/1')
tlv_ttl = lldp.TTL(ttl=120)

View File

@ -18,13 +18,12 @@
import unittest
import logging
import struct
import netaddr
import array
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet import *
from ryu.lib import addrconv
LOG = logging.getLogger('test_packet')
@ -34,10 +33,14 @@ class TestPacket(unittest.TestCase):
""" Test case for packet
"""
dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
dst_ip_bin = dst_ip = netaddr.IPAddress('192.168.128.10').packed
src_ip_bin = src_ip = netaddr.IPAddress('192.168.122.20').packed
dst_mac = 'aa:aa:aa:aa:aa:aa'
src_mac = 'bb:bb:bb:bb:bb:bb'
dst_mac_bin = addrconv.mac.text_to_bin(dst_mac)
src_mac_bin = addrconv.mac.text_to_bin(src_mac)
dst_ip = '192.168.128.10'
src_ip = '192.168.122.20'
dst_ip_bin = addrconv.ipv4.text_to_bin(dst_ip)
src_ip_bin = addrconv.ipv4.text_to_bin(src_ip)
payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \
+ '\xcd\xc5\x00\x00\x00\x00\x00\x00' \
+ '\x10\x11\x12\x13\x14\x15\x16\x17' \
@ -71,8 +74,8 @@ class TestPacket(unittest.TestCase):
p.serialize()
# ethernet !6s6sH
e_buf = self.dst_mac \
+ self.src_mac \
e_buf = self.dst_mac_bin \
+ self.src_mac_bin \
+ '\x08\x06'
# arp !HHBBH6sI6sI
@ -81,9 +84,9 @@ class TestPacket(unittest.TestCase):
+ '\x06' \
+ '\x04' \
+ '\x00\x02' \
+ self.src_mac \
+ self.src_mac_bin \
+ self.src_ip_bin \
+ self.dst_mac \
+ self.dst_mac_bin \
+ self.dst_ip_bin
buf = e_buf + a_buf
@ -128,8 +131,8 @@ class TestPacket(unittest.TestCase):
p.serialize()
# ethernet !6s6sH
e_buf = self.dst_mac \
+ self.src_mac \
e_buf = self.dst_mac_bin \
+ self.src_mac_bin \
+ '\x81\x00'
# vlan !HH
@ -142,9 +145,9 @@ class TestPacket(unittest.TestCase):
+ '\x06' \
+ '\x04' \
+ '\x00\x02' \
+ self.src_mac \
+ self.src_mac_bin \
+ self.src_ip_bin \
+ self.dst_mac \
+ self.dst_mac_bin \
+ self.dst_ip_bin
buf = e_buf + v_buf + a_buf
@ -198,8 +201,8 @@ class TestPacket(unittest.TestCase):
p.serialize()
# ethernet !6s6sH
e_buf = self.dst_mac \
+ self.src_mac \
e_buf = self.dst_mac_bin \
+ self.src_mac_bin \
+ '\x08\x00'
# ipv4 !BBHHHBBHII
@ -260,7 +263,7 @@ class TestPacket(unittest.TestCase):
eq_(0x77b2, p_udp.csum)
t = bytearray(u_buf)
struct.pack_into('!H', t, 6, p_udp.csum)
ph = struct.pack('!4s4sBBH', self.src_ip, self.dst_ip, 0,
ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
17, len(u_buf) + len(self.payload))
t = ph + t + self.payload
eq_(packet_utils.checksum(t), 0)
@ -286,8 +289,8 @@ class TestPacket(unittest.TestCase):
p.serialize()
# ethernet !6s6sH
e_buf = self.dst_mac \
+ self.src_mac \
e_buf = self.dst_mac_bin \
+ self.src_mac_bin \
+ '\x08\x00'
# ipv4 !BBHHHBBHII
@ -359,7 +362,7 @@ class TestPacket(unittest.TestCase):
eq_(len(t_buf), len(p_tcp))
t = bytearray(t_buf)
struct.pack_into('!H', t, 16, p_tcp.csum)
ph = struct.pack('!4s4sBBH', self.src_ip, self.dst_ip, 0,
ph = struct.pack('!4s4sBBH', self.src_ip_bin, self.dst_ip_bin, 0,
6, len(t_buf) + len(self.payload))
t = ph + t + self.payload
eq_(packet_utils.checksum(t), 0)

View File

@ -21,9 +21,9 @@ import copy
from struct import pack, unpack_from
from nose.tools import ok_, eq_, raises
from ryu.ofproto import ether
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib import addrconv
from ryu.lib.packet.slow import slow, lacp
from ryu.lib.packet.slow import SLOW_PROTOCOL_MULTICAST
from ryu.lib.packet.slow import SLOW_SUBTYPE_LACP
@ -41,7 +41,7 @@ class Test_slow(unittest.TestCase):
self.actor_tag = lacp.LACP_TLV_TYPE_ACTOR
self.actor_length = 20
self.actor_system_priority = 65534
self.actor_system = mac.haddr_to_bin('00:07:0d:af:f4:54')
self.actor_system = '00:07:0d:af:f4:54'
self.actor_key = 1
self.actor_port_priority = 65535
self.actor_port = 1
@ -65,7 +65,7 @@ class Test_slow(unittest.TestCase):
self.partner_tag = lacp.LACP_TLV_TYPE_PARTNER
self.partner_length = 20
self.partner_system_priority = 0
self.partner_system = mac.haddr_to_bin('00:00:00:00:00:00')
self.partner_system = '00:00:00:00:00:00'
self.partner_key = 0
self.partner_port_priority = 0
self.partner_port = 0
@ -167,7 +167,7 @@ class Test_lacp(unittest.TestCase):
self.actor_tag = lacp.LACP_TLV_TYPE_ACTOR
self.actor_length = 20
self.actor_system_priority = 65534
self.actor_system = mac.haddr_to_bin('00:07:0d:af:f4:54')
self.actor_system = '00:07:0d:af:f4:54'
self.actor_key = 1
self.actor_port_priority = 65535
self.actor_port = 1
@ -191,7 +191,7 @@ class Test_lacp(unittest.TestCase):
self.partner_tag = lacp.LACP_TLV_TYPE_PARTNER
self.partner_length = 20
self.partner_system_priority = 0
self.partner_system = mac.haddr_to_bin('00:00:00:00:00:00')
self.partner_system = '00:00:00:00:00:00'
self.partner_key = 0
self.partner_port_priority = 0
self.partner_port = 0
@ -237,7 +237,7 @@ class Test_lacp(unittest.TestCase):
self.actor_tag,
self.actor_length,
self.actor_system_priority,
self.actor_system,
addrconv.mac.text_to_bin(self.actor_system),
self.actor_key,
self.actor_port_priority,
self.actor_port,
@ -246,7 +246,7 @@ class Test_lacp(unittest.TestCase):
self.partner_tag,
self.partner_length,
self.partner_system_priority,
self.partner_system,
addrconv.mac.text_to_bin(self.partner_system),
self.partner_key,
self.partner_port_priority,
self.partner_port,
@ -423,7 +423,7 @@ class Test_lacp(unittest.TestCase):
eq_(act_res[0], self.actor_tag)
eq_(act_res[1], self.actor_length)
eq_(act_res[2], self.actor_system_priority)
eq_(act_res[3], self.actor_system)
eq_(addrconv.mac.bin_to_text(act_res[3]), self.actor_system)
eq_(act_res[4], self.actor_key)
eq_(act_res[5], self.actor_port_priority)
eq_(act_res[6], self.actor_port)
@ -432,7 +432,7 @@ class Test_lacp(unittest.TestCase):
eq_(prt_res[0], self.partner_tag)
eq_(prt_res[1], self.partner_length)
eq_(prt_res[2], self.partner_system_priority)
eq_(prt_res[3], self.partner_system)
eq_(addrconv.mac.bin_to_text(prt_res[3]), self.partner_system)
eq_(prt_res[4], self.partner_key)
eq_(prt_res[5], self.partner_port_priority)
eq_(prt_res[6], self.partner_port)

View File

@ -18,17 +18,15 @@
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.tcp import tcp
from ryu.lib.packet.ipv4 import ipv4
from ryu.lib.packet import packet_utils
from ryu.lib import addrconv
LOG = logging.getLogger('test_tcp')
@ -92,8 +90,8 @@ class Test_tcp(unittest.TestCase):
offset = 5
csum = 0
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_TCP, 0, src_ip, dst_ip)
@ -112,7 +110,9 @@ class Test_tcp(unittest.TestCase):
eq_(res[8], self.urgent)
# checksum
ph = struct.pack('!4s4sBBH', src_ip, dst_ip, 0, 6, offset * 4)
ph = struct.pack('!4s4sBBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)
@ -122,8 +122,8 @@ class Test_tcp(unittest.TestCase):
csum = 0
option = '\x01\x02'
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_TCP, 0, src_ip, dst_ip)

View File

@ -18,17 +18,15 @@
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.udp import udp
from ryu.lib.packet.ipv4 import ipv4
from ryu.lib.packet import packet_utils
from ryu.lib import addrconv
LOG = logging.getLogger('test_udp')
@ -71,8 +69,8 @@ class Test_udp(unittest.TestCase):
total_length = 0
csum = 0
src_ip = netaddr.IPAddress('192.168.10.1').packed
dst_ip = netaddr.IPAddress('192.168.100.1').packed
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_UDP, 0, src_ip, dst_ip)
@ -85,7 +83,9 @@ class Test_udp(unittest.TestCase):
eq_(res[2], struct.calcsize(udp._PACK_STR))
# checksum
ph = struct.pack('!4s4sBBH', src_ip, dst_ip, 0, 17, res[2])
ph = struct.pack('!4s4sBBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip), 0, 17, res[2])
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)

View File

@ -18,12 +18,10 @@
import unittest
import logging
import struct
import netaddr
from struct import *
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
from ryu.lib import mac
from ryu.lib.packet.ethernet import ethernet
from ryu.lib.packet.packet import Packet
from ryu.lib.packet.ipv4 import ipv4
@ -85,8 +83,8 @@ class Test_vlan(unittest.TestCase):
eq_(res[1], self.ethertype)
def _build_vlan(self):
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
src_mac = '00:07:0d:af:f4:54'
dst_mac = '00:00:00:00:00:00'
ethertype = ether.ETH_TYPE_8021Q
e = ethernet(dst_mac, src_mac, ethertype)
@ -100,8 +98,8 @@ class Test_vlan(unittest.TestCase):
ttl = 64
proto = inet.IPPROTO_ICMP
csum = 0xa7f2
src = netaddr.IPAddress('131.151.32.21').packed
dst = netaddr.IPAddress('131.151.32.129').packed
src = '131.151.32.21'
dst = '131.151.32.129'
option = 'TEST'
ip = ipv4(version, header_length, tos, total_length, identification,
flags, offset, ttl, proto, csum, src, dst, option)

View File

@ -19,7 +19,6 @@
import unittest
import logging
import struct
import netaddr
from nose.tools import eq_, ok_
from nose.tools import raises
@ -30,6 +29,7 @@ from ryu.lib.packet import ipv6
from ryu.lib.packet import packet
from ryu.lib.packet import packet_utils
from ryu.lib.packet import vrrp
from ryu.lib import addrconv
LOG = logging.getLogger(__name__)
@ -46,14 +46,15 @@ class Test_vrrpv2(unittest.TestCase):
auth_type = vrrp.VRRP_AUTH_NO_AUTH
max_adver_int = 100
checksum = 0
ip_address = netaddr.IPAddress('192.168.0.1').packed
ip_address = '192.168.0.1'
auth_data = (0, 0)
vrrpv2 = vrrp.vrrpv2.create(type_, vrid, priority, max_adver_int,
[ip_address])
buf = struct.pack(vrrp.vrrpv2._PACK_STR + '4sII',
vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_),
vrid, priority, count_ip,
auth_type, max_adver_int, checksum, ip_address,
auth_type, max_adver_int, checksum,
addrconv.ipv4.text_to_bin(ip_address),
auth_data[0], auth_data[1])
def setUp(self):
@ -89,7 +90,7 @@ class Test_vrrpv2(unittest.TestCase):
eq_(self.auth_data, vrrpv2.auth_data)
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').packed
src_ip = '192.168.0.1'
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
@ -98,7 +99,7 @@ class Test_vrrpv2(unittest.TestCase):
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').packed
ip_address = '192.168.0.2'
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv2.create(
@ -115,7 +116,7 @@ class Test_vrrpv2(unittest.TestCase):
eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
eq_(res[5], max_adver_int)
# res[6] is checksum
eq_(res[7], ip_address)
eq_(addrconv.ipv4.bin_to_text(res[7]), ip_address)
eq_(res[8], 0)
eq_(res[9], 0)
eq_(len(buf), pack_len)
@ -130,7 +131,7 @@ class Test_vrrpv2(unittest.TestCase):
vrrp.vrrp.parser(m_short_buf)
def test_create_packet(self):
primary_ip = netaddr.IPAddress('192.168.0.2').packed
primary_ip = '192.168.0.2'
p0 = self.vrrpv2.create_packet(primary_ip)
p0.serialize()
p1 = packet.Packet(str(p0.data))
@ -193,13 +194,14 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
count_ip = 1
max_adver_int = 111
checksum = 0
ip_address = netaddr.IPAddress('192.168.0.1').packed
ip_address = '192.168.0.1'
vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
[ip_address])
buf = struct.pack(vrrp.vrrpv3._PACK_STR + '4s',
vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
vrid, priority, count_ip,
max_adver_int, checksum, ip_address)
max_adver_int, checksum,
addrconv.ipv4.text_to_bin(ip_address))
def setUp(self):
pass
@ -230,7 +232,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
eq_(self.ip_address, vrrpv3.ip_addresses[0])
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').packed
src_ip = '192.168.0.1'
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
@ -239,7 +241,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').packed
ip_address = '192.168.0.2'
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv3.create(
@ -256,13 +258,15 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
eq_(res[3], len(ip_addresses))
eq_(res[4], max_adver_int)
# res[5] is checksum
eq_(res[6], ip_address)
eq_(addrconv.ipv4.bin_to_text(res[6]), ip_address)
eq_(len(buf), pack_len)
print(res)
# checksum
ph = struct.pack('!4s4sxBH', src_ip, dst_ip, inet.IPPROTO_VRRP,
pack_len)
ph = struct.pack('!4s4sxBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip),
inet.IPPROTO_VRRP, pack_len)
s = packet_utils.checksum(ph + buf)
eq_(0, s)
@ -272,7 +276,7 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
vrrp.vrrp.parser(m_short_buf)
def test_create_packet(self):
primary_ip = netaddr.IPAddress('192.168.0.2').packed
primary_ip = '192.168.0.2'
p0 = self.vrrpv3.create_packet(primary_ip)
p0.serialize()
p1 = packet.Packet(str(p0.data))
@ -335,13 +339,14 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
count_ip = 1
max_adver_int = 111
checksum = 0
ip_address = netaddr.IPAddress('2001:DB8:2000::1').packed
ip_address = '2001:db8:2000::1'
vrrpv3 = vrrp.vrrpv3.create(type_, vrid, priority, max_adver_int,
[ip_address])
buf = struct.pack(vrrp.vrrpv3._PACK_STR + '16s',
vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V3, type_),
vrid, priority, count_ip,
max_adver_int, checksum, ip_address)
max_adver_int, checksum,
addrconv.ipv6.text_to_bin(ip_address))
def setUp(self):
pass
@ -372,7 +377,7 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
eq_(self.ip_address, vrrpv3.ip_addresses[0])
def test_serialize(self):
src_ip = netaddr.IPAddress('2001:DB8:2000::1').packed
src_ip = '2001:db8:2000::1'
dst_ip = vrrp.VRRP_IPV6_DST_ADDRESS
prev = ipv6.ipv6(6, 0, 0, 0, inet.IPPROTO_VRRP,
vrrp.VRRP_IPV6_HOP_LIMIT, src_ip, dst_ip)
@ -381,7 +386,7 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('2001:DB8:2000::2').packed
ip_address = '2001:db8:2000::2'
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv3.create(
@ -398,13 +403,15 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
eq_(res[3], len(ip_addresses))
eq_(res[4], max_adver_int)
# res[5] is checksum
eq_(res[6], ip_address)
eq_(addrconv.ipv6.bin_to_text(res[6]), ip_address)
eq_(len(buf), pack_len)
print(res)
# checksum
ph = struct.pack('!16s16sI3xB',
src_ip, dst_ip, pack_len, inet.IPPROTO_VRRP)
addrconv.ipv6.text_to_bin(src_ip),
addrconv.ipv6.text_to_bin(dst_ip),
pack_len, inet.IPPROTO_VRRP)
s = packet_utils.checksum(ph + buf)
eq_(0, s)
@ -414,7 +421,7 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
vrrp.vrrp.parser(m_short_buf)
def test_create_packet(self):
primary_ip = netaddr.IPAddress('2001:DB8:2000::3').packed
primary_ip = '2001:db8:2000::3'
p0 = self.vrrpv3.create_packet(primary_ip)
p0.serialize()
print(len(p0.data), p0.data)