Merge "Add the DHCPReponder for IPv6"
This commit is contained in:
commit
02451f381b
neutron
336
neutron/agent/l2/extensions/dhcp/ipv6.py
Normal file
336
neutron/agent/l2/extensions/dhcp/ipv6.py
Normal file
@ -0,0 +1,336 @@
|
||||
# Copyright (c) 2021 China Unicom Cloud Data Co.,Ltd.
|
||||
# Copyright (c) 2019 - 2020 China Telecom Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import struct
|
||||
import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from os_ken.lib import addrconv
|
||||
from os_ken.lib.packet import dhcp6
|
||||
from os_ken.lib.packet import ethernet
|
||||
from os_ken.lib.packet import in_proto as inet
|
||||
from os_ken.lib.packet import ipv6
|
||||
from os_ken.lib.packet import packet
|
||||
from os_ken.lib.packet import udp
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.agent.l2.extensions.dhcp import base as dhcp_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DHCPV6_TYPE_MAP = {dhcp6.DHCPV6_SOLICIT: 'SOLICIT',
|
||||
dhcp6.DHCPV6_REQUEST: 'REQUEST',
|
||||
dhcp6.DHCPV6_CONFIRM: 'CONFIRM',
|
||||
dhcp6.DHCPV6_RELEASE: 'RELEASE',
|
||||
dhcp6.DHCPV6_RENEW: 'RENEW',
|
||||
dhcp6.DHCPV6_REBIND: 'REBIND'}
|
||||
REQ_TYPES_FOR_REPLY = ['REQUEST', 'CONFIRM', 'RELEASE', 'RENEW', 'REBIND']
|
||||
REQ_TYPE_UNKNOWN = -1
|
||||
# DUID Type: link-layer address plus time (1)
|
||||
DUID_TYPE_LINK_LAYER_ADDRESS_PLUS_TIME = 1
|
||||
# Hardware type: Ethernet (1)
|
||||
HARDWARE_TYPE_ETHERNET = 1
|
||||
# Status Code: Success (0)
|
||||
DHCPV6_STATUS_CODE_SUCCESS = 0
|
||||
# 1/1/2000 in integer style
|
||||
TIME_FIRST_DAY_2000 = 946684800
|
||||
|
||||
# TODO(liuyulong): move to os-ken someday
|
||||
DHCPV6_OPTION_DNS_RECURSIVE_NS = 23
|
||||
DHCPV6_OPTION_DOMAIN_SEARCH_LIST = 24
|
||||
DHCPV6_OPTION_FQDN = 39
|
||||
|
||||
|
||||
class DHCPIPv6Responder(dhcp_base.DHCPResponderBase):
|
||||
|
||||
def __init__(self, agent_api, ext_api, *args, **kwargs):
|
||||
super(DHCPIPv6Responder, self).__init__(agent_api, ext_api,
|
||||
version=dhcp_base.IPV6_STR,
|
||||
*args, **kwargs)
|
||||
|
||||
def _create_duid(self, mac):
|
||||
"""Create a DUID based on the mac address and time.
|
||||
|
||||
For details see RFC 8415:
|
||||
11.2. DUID Based on Link-Layer Address Plus Time (DUID-LLT)
|
||||
https://datatracker.ietf.org/doc/html/rfc8415#section-11.2
|
||||
"""
|
||||
duid_type = struct.pack('!H', DUID_TYPE_LINK_LAYER_ADDRESS_PLUS_TIME)
|
||||
hardware_type = struct.pack('!H', HARDWARE_TYPE_ETHERNET)
|
||||
# DUID Time: time now
|
||||
# Rebase epoch to 1/1/2000.
|
||||
duid_time = struct.pack('!I', int(time.time() - TIME_FIRST_DAY_2000))
|
||||
# Link-layer address
|
||||
mac_bin = addrconv.mac.text_to_bin(str(mac))
|
||||
return duid_type + hardware_type + duid_time + mac_bin
|
||||
|
||||
def get_dhcpv6_client_ident(self, mac, req_options):
|
||||
# DHCPV6_OPTION_CLIENTID = 1
|
||||
for opt in req_options:
|
||||
if opt.code == dhcp6.DHCPV6_OPTION_CLIENTID:
|
||||
return opt.data
|
||||
return self._create_duid(mac)
|
||||
|
||||
def get_dhcpv6_server_ident(self):
|
||||
# DHCPV6_OPTION_SERVERID = 2
|
||||
return self._create_duid(self.hw_addr)
|
||||
|
||||
def get_dhcpv6_status_code(self, message, code=DHCPV6_STATUS_CODE_SUCCESS):
|
||||
# DHCPV6_OPTION_STATUS_CODE = 13
|
||||
status_code = struct.pack('!H', code)
|
||||
message_bin = struct.pack('!%ds' % len(message),
|
||||
bytes(str(message).encode()))
|
||||
return status_code + message_bin
|
||||
|
||||
def _get_ia_na_attrs(self, options):
|
||||
"""Get IA_NA options."""
|
||||
attrs = {}
|
||||
default_value = struct.pack('!i', cfg.CONF.dhcp_lease_duration)
|
||||
|
||||
def _check_and_get_value(opt, start, end, iaid=False):
|
||||
data = struct.unpack('!i', opt.data[start:end])
|
||||
if data and data[0] != 0:
|
||||
# Get request time interval T1 option for IA_NA.
|
||||
# Get request time interval T2 option for IA_NA.
|
||||
# Get request Preferred Lifetime for IA_NA.
|
||||
# Get request Valid Lifetime for IA_NA.
|
||||
# Get request IAID for IA_NA.
|
||||
return opt.data[start:end]
|
||||
elif iaid:
|
||||
# default IAID
|
||||
return struct.pack('!I', 1)
|
||||
# default time or interval
|
||||
return default_value
|
||||
|
||||
for opt in options:
|
||||
if opt.code == dhcp6.DHCPV6_OPTION_IA_NA:
|
||||
attrs['t1'] = _check_and_get_value(opt, 4, 8)
|
||||
attrs['t2'] = _check_and_get_value(opt, 8, 12)
|
||||
attrs['preferred_lifetime'] = _check_and_get_value(
|
||||
opt, -8, -4)
|
||||
attrs['valid_lifetime'] = _check_and_get_value(
|
||||
opt, -4, len(opt.data))
|
||||
attrs['ia_id'] = _check_and_get_value(
|
||||
opt, 0, 4, iaid=True)
|
||||
return attrs
|
||||
|
||||
def _get_ia_na_opt(self, options, ip_addr):
|
||||
# DHCPV6_OPTION_IA_NA = 3
|
||||
attrs = self._get_ia_na_attrs(options)
|
||||
# IA Address
|
||||
# Option: IA Address (5)
|
||||
ia_addr_opt = struct.pack('!H', dhcp6.DHCPV6_OPTION_IAADDR)
|
||||
# IPv6 address: client IPv6 Address
|
||||
ia_addr_bin = addrconv.ipv6.text_to_bin(str(ip_addr))
|
||||
|
||||
ia = (ia_addr_bin + attrs['preferred_lifetime'] +
|
||||
attrs['valid_lifetime'])
|
||||
# Length: 24
|
||||
ia_addr_len = struct.pack('!H', len(ia))
|
||||
ia_addr = ia_addr_opt + ia_addr_len + ia
|
||||
ia_na_data = attrs['ia_id'] + attrs['t1'] + attrs['t2'] + ia_addr
|
||||
return ia_na_data
|
||||
|
||||
def get_dhcp_options(self, mac, ip_info, req_options, req_type):
|
||||
ip_addr = ip_info['ip_address']
|
||||
gateway_ip = ip_info['gateway_ip']
|
||||
dns_nameservers = ip_info['dns_nameservers']
|
||||
|
||||
option_list = []
|
||||
client_ident = self.get_dhcpv6_client_ident(mac, req_options)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_CLIENTID,
|
||||
data=client_ident,
|
||||
length=len(client_ident)))
|
||||
|
||||
server_id_bin = self.get_dhcpv6_server_ident()
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_SERVERID,
|
||||
data=server_id_bin,
|
||||
length=len(server_id_bin)))
|
||||
|
||||
ia_na_data = self._get_ia_na_opt(req_options, ip_addr)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_IA_NA,
|
||||
data=ia_na_data, length=len(ia_na_data)))
|
||||
|
||||
# Status Message: success
|
||||
status_bin = self.get_dhcpv6_status_code("success")
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_STATUS_CODE,
|
||||
data=status_bin, length=len(status_bin)))
|
||||
|
||||
if req_type == 'SOLICIT': # for DHCP6 advertise packet
|
||||
# DHCPV6_OPTION_PREFERENCE = 7
|
||||
# Pref-value: 0
|
||||
perference = struct.pack('!b', 0)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_PREFERENCE,
|
||||
data=perference,
|
||||
length=len(perference)))
|
||||
|
||||
# 24: Domain Search List
|
||||
if req_type == 'REQUEST' and cfg.CONF.dns_domain:
|
||||
# Domain Search List FQDN: default openstacklocal
|
||||
dns_domain = struct.pack(
|
||||
'!%ds' % len(str(cfg.CONF.dns_domain)),
|
||||
bytes(str(cfg.CONF.dns_domain).encode()))
|
||||
dns_str_len = struct.pack('!b', len(dns_domain))
|
||||
dns_str_end = struct.pack('!b', 0)
|
||||
dns_domain_data = dns_str_len + dns_domain + dns_str_end
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=DHCPV6_OPTION_DOMAIN_SEARCH_LIST,
|
||||
data=dns_domain_data,
|
||||
length=len(dns_domain_data)))
|
||||
|
||||
# 23: DNS recursive name server
|
||||
if dns_nameservers:
|
||||
domain_serach = self.get_bin_dns(dns_nameservers)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=DHCPV6_OPTION_DNS_RECURSIVE_NS,
|
||||
data=domain_serach, length=len(domain_serach)))
|
||||
else:
|
||||
# use gateway as the default DNS server address
|
||||
domain_serach = addrconv.ipv6.text_to_bin(str(gateway_ip))
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=DHCPV6_OPTION_DNS_RECURSIVE_NS,
|
||||
data=domain_serach, length=len(domain_serach)))
|
||||
|
||||
# 39: Fully Qualified Domain Name
|
||||
fqdn = 'host-%s' % ip_addr.replace('.', '-').replace(':', '-')
|
||||
if req_type == 'REQUEST' and cfg.CONF.dns_domain:
|
||||
fqdn = '%s.%s' % (fqdn, cfg.CONF.dns_domain)
|
||||
|
||||
# 0000 0... = Reserved: 0x00
|
||||
# .... .0.. = N bit: Server should perform DNS updates
|
||||
# .... ..1. = O bit: Server has overridden client's S bit preference
|
||||
# .... ...1 = S bit: Server should perform forward DNS updates
|
||||
dns_tag = struct.pack('!b', 3)
|
||||
# Client FQDN: host-<ip-v6-address>
|
||||
fqdn_bin = struct.pack('!%ds' % len(fqdn), bytes(str(fqdn).encode()))
|
||||
fqdn_str_len = struct.pack('!b', len(fqdn_bin))
|
||||
dns_data = (dns_tag + fqdn_str_len + fqdn_bin)
|
||||
option_list.append(
|
||||
dhcp6.option(code=DHCPV6_OPTION_FQDN,
|
||||
data=dns_data, length=len(dns_data)))
|
||||
|
||||
# Final option list
|
||||
options = dhcp6.options(option_list=option_list)
|
||||
return options
|
||||
|
||||
def get_ret_type(self, req_type):
|
||||
if req_type == 'SOLICIT':
|
||||
return dhcp6.DHCPV6_ADVERTISE
|
||||
elif req_type in REQ_TYPES_FOR_REPLY:
|
||||
return dhcp6.DHCPV6_REPLY
|
||||
return REQ_TYPE_UNKNOWN
|
||||
|
||||
def get_ret_packet(self, packet_in, port_info, req_type):
|
||||
ip_info = self.get_port_ip(port_info,
|
||||
ip_version=constants.IP_VERSION_6)
|
||||
if not ip_info:
|
||||
return
|
||||
gateway_ip = ip_info['gateway_ip']
|
||||
mac = port_info['mac_address']
|
||||
|
||||
header_eth = packet_in.get_protocol(ethernet.ethernet)
|
||||
header_ipv6 = packet_in.get_protocol(ipv6.ipv6)
|
||||
header_dhcp = packet_in.get_protocol(dhcp6.dhcp6)
|
||||
|
||||
if req_type == 'CONFIRM':
|
||||
options = self.get_reply_dhcp_options(
|
||||
mac, message="all addresses still on link",
|
||||
req_options=header_dhcp.options.option_list)
|
||||
if req_type == 'RELEASE':
|
||||
options = self.get_reply_dhcp_options(
|
||||
mac, message="release received",
|
||||
req_options=header_dhcp.options.option_list)
|
||||
else:
|
||||
options = self.get_dhcp_options(
|
||||
mac, ip_info,
|
||||
header_dhcp.options.option_list,
|
||||
req_type)
|
||||
|
||||
ret_pkt = packet.Packet()
|
||||
ret_pkt.add_protocol(ethernet.ethernet(
|
||||
ethertype=header_eth.ethertype,
|
||||
dst=header_eth.src,
|
||||
src=self.hw_addr))
|
||||
ret_pkt.add_protocol(
|
||||
ipv6.ipv6(
|
||||
src=gateway_ip,
|
||||
dst=header_ipv6.src,
|
||||
nxt=inet.IPPROTO_UDP))
|
||||
ret_pkt.add_protocol(udp.udp(src_port=constants.DHCPV6_RESPONSE_PORT,
|
||||
dst_port=constants.DHCPV6_CLIENT_PORT))
|
||||
|
||||
ret_type = self.get_ret_type(req_type)
|
||||
|
||||
ret_pkt.add_protocol(dhcp6.dhcp6(
|
||||
ret_type, options,
|
||||
transaction_id=header_dhcp.transaction_id))
|
||||
|
||||
return ret_pkt
|
||||
|
||||
def get_reply_dhcp_options(self, mac, message, req_options):
|
||||
option_list = []
|
||||
client_ident = self.get_dhcpv6_client_ident(mac, req_options)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_CLIENTID,
|
||||
data=client_ident,
|
||||
length=len(client_ident)))
|
||||
|
||||
server_id_bin = self.get_dhcpv6_server_ident()
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_SERVERID,
|
||||
data=server_id_bin,
|
||||
length=len(server_id_bin)))
|
||||
|
||||
# Status Message: "<message>"
|
||||
status_bin = self.get_dhcpv6_status_code(
|
||||
message, code=DHCPV6_STATUS_CODE_SUCCESS)
|
||||
option_list.append(
|
||||
dhcp6.option(
|
||||
code=dhcp6.DHCPV6_OPTION_STATUS_CODE,
|
||||
data=status_bin, length=len(status_bin)))
|
||||
|
||||
# Final option list
|
||||
options = dhcp6.options(option_list=option_list)
|
||||
return options
|
||||
|
||||
def handle_dhcp(self, datapath, ofport, pkt, port_info):
|
||||
pkt_dhcp = pkt.get_protocol(dhcp6.dhcp6)
|
||||
dhcp_req_state = DHCPV6_TYPE_MAP.get(pkt_dhcp.msg_type)
|
||||
if not dhcp_req_state:
|
||||
LOG.warning("DHCP controller received DHCPv6 with unkown "
|
||||
"type: %s from port: %s",
|
||||
pkt_dhcp.msg_type, ofport)
|
||||
return
|
||||
LOG.info("DHCP controller DHCPv6 packet received, "
|
||||
"state: %s, data: %s", dhcp_req_state, pkt_dhcp)
|
||||
self.packet_out(datapath, ofport,
|
||||
self.get_ret_packet(pkt, port_info, dhcp_req_state))
|
221
neutron/tests/unit/agent/l2/extensions/dhcp/test_ipv6.py
Normal file
221
neutron/tests/unit/agent/l2/extensions/dhcp/test_ipv6.py
Normal file
@ -0,0 +1,221 @@
|
||||
# Copyright (c) 2021 China Unicom Cloud Data Co.,Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from neutron_lib import constants
|
||||
from os_ken.lib.packet import dhcp6
|
||||
from os_ken.lib.packet import ether_types
|
||||
from os_ken.lib.packet import ethernet
|
||||
from os_ken.lib.packet import in_proto as inet
|
||||
from os_ken.lib.packet import ipv6
|
||||
from os_ken.lib.packet import packet
|
||||
from os_ken.lib.packet import udp
|
||||
|
||||
from neutron.agent.l2.extensions.dhcp import ipv6 as dhcp_ipv6
|
||||
from neutron.tests.unit.agent.l2.extensions.dhcp \
|
||||
import test_base as dhcp_test_base
|
||||
|
||||
ONE_SEC_AFTER_2000 = dhcp_ipv6.TIME_FIRST_DAY_2000 + 1
|
||||
|
||||
|
||||
class DHCPIPv6ResponderTestCase(dhcp_test_base.DHCPResponderBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DHCPIPv6ResponderTestCase, self).setUp()
|
||||
self.dhcp6_responer = dhcp_ipv6.DHCPIPv6Responder(self.agent_api,
|
||||
self.ext_api)
|
||||
self.dhcp6_responer.int_br = self.int_br
|
||||
|
||||
def _compare_option_values(self, expect_options, test_options):
|
||||
# os_ken dhcp.option class does not have __eq__ method so
|
||||
# compare one by one
|
||||
expected = [(option.code, option.length, option.data)
|
||||
for option in expect_options]
|
||||
test = [(option.code, option.length, option.data)
|
||||
for option in test_options]
|
||||
for i in test:
|
||||
self.assertIn(i, expected)
|
||||
|
||||
def _create_test_dhcp6_packet(self, zero_time=False):
|
||||
ret_pkt = packet.Packet()
|
||||
ret_pkt.add_protocol(
|
||||
ethernet.ethernet(
|
||||
ethertype=ether_types.ETH_TYPE_IPV6,
|
||||
dst='33:33:00:01:00:02',
|
||||
src=self.port_info['mac_address']))
|
||||
ret_pkt.add_protocol(
|
||||
ipv6.ipv6(
|
||||
src='fe80::f816:3eff:fe60:714b',
|
||||
dst='ff02::1:2',
|
||||
nxt=inet.IPPROTO_UDP))
|
||||
ret_pkt.add_protocol(
|
||||
udp.udp(
|
||||
src_port=constants.DHCPV6_RESPONSE_PORT,
|
||||
dst_port=constants.DHCPV6_CLIENT_PORT))
|
||||
|
||||
options = [dhcp6.option(
|
||||
code=1,
|
||||
data=b"\x00\x01\x00\x01",
|
||||
length=4)]
|
||||
if zero_time:
|
||||
options.append(dhcp6.option(
|
||||
code=3,
|
||||
data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
|
||||
length=12))
|
||||
else:
|
||||
options.append(dhcp6.option(
|
||||
code=3,
|
||||
data=b"\x01\x02\x03\x04\x05\x06\x07\x08\x0a\x0b\x0c\x0d",
|
||||
length=12))
|
||||
ret_pkt.add_protocol(dhcp6.dhcp6(
|
||||
dhcp6.DHCPV6_REQUEST, dhcp6.options(option_list=options)))
|
||||
return ret_pkt
|
||||
|
||||
def test_get_dhcpv6_client_ident(self):
|
||||
packet_in = self._create_test_dhcp6_packet()
|
||||
header_dhcp = packet_in.get_protocol(dhcp6.dhcp6)
|
||||
client_ident = self.dhcp6_responer.get_dhcpv6_client_ident(
|
||||
self.port_info['mac_address'], header_dhcp.options.option_list)
|
||||
self.assertEqual(header_dhcp.options.option_list[0].data,
|
||||
client_ident)
|
||||
|
||||
expect_ident = (
|
||||
b'\x00\x01\x00\x01\x00\x00\x00\x01\x00\x01\x02\x03\x04\x05')
|
||||
time.time = mock.Mock(return_value=ONE_SEC_AFTER_2000)
|
||||
client_ident = client_ident = (
|
||||
self.dhcp6_responer.get_dhcpv6_client_ident(
|
||||
self.port_info['mac_address'], []))
|
||||
self.assertEqual(expect_ident, client_ident)
|
||||
|
||||
def test_get_dhcpv6_server_ident(self):
|
||||
self.dhcp6_responer.get_dhcpv6_server_ident()
|
||||
|
||||
def test_get_dhcpv6_status_code(self):
|
||||
expect_status_code = b'\x00\x00success'
|
||||
status_code = self.dhcp6_responer.get_dhcpv6_status_code(
|
||||
"success", code=0)
|
||||
self.assertEqual(expect_status_code, status_code)
|
||||
|
||||
def test_get_dhcp_options(self):
|
||||
self._test_get_dhcp_options()
|
||||
|
||||
def test_get_dhcp_options_zero_time(self):
|
||||
self._test_get_dhcp_options(zero_time=True)
|
||||
|
||||
def _test_get_dhcp_options(self, zero_time=False):
|
||||
ip_info = self.dhcp6_responer.get_port_ip(self.port_info, ip_version=6)
|
||||
mac = self.port_info['mac_address']
|
||||
|
||||
option_list = [
|
||||
dhcp6.option(
|
||||
code=1,
|
||||
data=b"\x00\x01\x00\x01",
|
||||
length=4),
|
||||
dhcp6.option(
|
||||
code=2,
|
||||
data=b'\x00\x01\x00\x01\x00\x00\x00\x01\xfa\x16>\x00\x00\x00',
|
||||
length=14),
|
||||
dhcp6.option(code=13,
|
||||
data=b'\x00\x00success',
|
||||
length=9),
|
||||
dhcp6.option(
|
||||
code=23,
|
||||
data=(b'\xfd\xa7\xa5\xcc4`\x00\x01\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x01'),
|
||||
length=16),
|
||||
dhcp6.option(
|
||||
code=24,
|
||||
data=b'\x0eopenstacklocal\x00',
|
||||
length=16),
|
||||
dhcp6.option(
|
||||
code=39,
|
||||
data=b'\x03(host-fda7-a5cc-3460-1--bf.openstacklocal',
|
||||
length=42)]
|
||||
if zero_time:
|
||||
option_list.append(dhcp6.option(code=3,
|
||||
data=(b'\x00\x00\x00\x01\x00\x01Q\x80\x00\x01Q'
|
||||
b'\x80\x00\x05\x00\x18\xfd\xa7\xa5\xcc4`'
|
||||
b'\x00\x01\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\xbf\x00\x01Q\x80\x00\x01Q\x80'),
|
||||
length=40))
|
||||
else:
|
||||
option_list.append(dhcp6.option(code=3,
|
||||
data=(b'\x01\x02\x03\x04\x05\x06\x07\x08\n\x0b\x0c\r'
|
||||
b'\x00\x05\x00\x18\xfd\xa7\xa5\xcc4`'
|
||||
b'\x00\x01\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\xbf\x05\x06\x07\x08\n\x0b\x0c\r'),
|
||||
length=40))
|
||||
|
||||
test_options = dhcp6.options(
|
||||
option_list=option_list,
|
||||
options_len=0)
|
||||
|
||||
time.time = mock.Mock(return_value=ONE_SEC_AFTER_2000)
|
||||
packet_in = self._create_test_dhcp6_packet(zero_time=zero_time)
|
||||
pkt_dhcp = packet_in.get_protocol(dhcp6.dhcp6)
|
||||
dhcp_req_state = dhcp_ipv6.DHCPV6_TYPE_MAP.get(pkt_dhcp.msg_type)
|
||||
dhcp_options = self.dhcp6_responer.get_dhcp_options(
|
||||
mac, ip_info, pkt_dhcp.options.option_list, dhcp_req_state)
|
||||
self._compare_option_values(test_options.option_list,
|
||||
dhcp_options.option_list)
|
||||
|
||||
def test_get_ret_packet(self):
|
||||
packet_in = self._create_test_dhcp6_packet()
|
||||
pkt_dhcp = packet_in.get_protocol(dhcp6.dhcp6)
|
||||
dhcp_req_state = dhcp_ipv6.DHCPV6_TYPE_MAP.get(pkt_dhcp.msg_type)
|
||||
ret_packet = self.dhcp6_responer.get_ret_packet(
|
||||
packet_in, self.port_info, dhcp_req_state)
|
||||
|
||||
header_eth = ret_packet.get_protocol(ethernet.ethernet)
|
||||
header_ipv6 = ret_packet.get_protocol(ipv6.ipv6)
|
||||
header_dhcp = ret_packet.get_protocol(dhcp6.dhcp6)
|
||||
|
||||
self.assertIsNotNone(header_eth)
|
||||
self.assertIsNotNone(header_ipv6)
|
||||
self.assertIsNotNone(header_dhcp)
|
||||
|
||||
def test_get_reply_dhcp_options(self):
|
||||
mac = '00:01:02:03:04:05'
|
||||
packet_in = self._create_test_dhcp6_packet()
|
||||
header_dhcp = packet_in.get_protocol(dhcp6.dhcp6)
|
||||
time.time = mock.Mock(return_value=ONE_SEC_AFTER_2000)
|
||||
dhcp_options = self.dhcp6_responer.get_reply_dhcp_options(
|
||||
mac, message="all addresses still on link",
|
||||
req_options=header_dhcp.options.option_list)
|
||||
|
||||
test_options = dhcp6.options(option_list=[
|
||||
dhcp6.option(code=1, data=b'\x00\x01\x00\x01', length=4),
|
||||
dhcp6.option(
|
||||
code=2,
|
||||
data=b'\x00\x01\x00\x01\x00\x00\x00\x01\xfa\x16>\x00\x00\x00',
|
||||
length=14),
|
||||
dhcp6.option(code=13, data=b'\x00\x00all addresses still on link',
|
||||
length=29)],
|
||||
options_len=0)
|
||||
|
||||
self._compare_option_values(test_options.option_list,
|
||||
dhcp_options.option_list)
|
||||
|
||||
def test_handle_dhcp(self):
|
||||
self.dhcp6_responer.packet_out = mock.Mock()
|
||||
datapath = mock.Mock()
|
||||
ofport = 1
|
||||
packet_in = self._create_test_dhcp6_packet()
|
||||
self.dhcp6_responer.handle_dhcp(
|
||||
datapath, ofport, packet_in, self.port_info)
|
||||
self.dhcp6_responer.packet_out.assert_called_once_with(
|
||||
datapath, ofport, mock.ANY)
|
Loading…
x
Reference in New Issue
Block a user