d5d62c8dbf
Drop the third party mock library to use unittest mock from standard library. Change-Id: Ib64b661572e4869a24865c02a6c84a6603930394
397 lines
13 KiB
Python
397 lines
13 KiB
Python
# Copyright 2014 Rackspace, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import binascii
|
|
import socket
|
|
from unittest import mock
|
|
|
|
from oslo_config import cfg
|
|
|
|
from ironic_python_agent import netutils
|
|
from ironic_python_agent.tests.unit import base
|
|
|
|
# hexlify-ed output from LLDP packet
|
|
FAKE_LLDP_PACKET = binascii.unhexlify(
|
|
'0180c200000e885a92365a3988cc'
|
|
'0000'
|
|
'020704885a92ec5459'
|
|
'040d0545746865726e6574312f3138'
|
|
'06020078'
|
|
)
|
|
|
|
cfg.CONF.import_opt('lldp_timeout', 'ironic_python_agent.config')
|
|
|
|
|
|
def socket_socket_sig(family=None, type=None, proto=None):
|
|
# Signature for socket.socket to be used by mock
|
|
pass
|
|
|
|
|
|
class TestNetutils(base.IronicAgentTest):
|
|
def setUp(self):
|
|
super(TestNetutils, self).setUp()
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info(self, sock_mock, select_mock, fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')],
|
|
'eth0': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')]
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.recv.return_value = FAKE_LLDP_PACKET
|
|
sock1.fileno.return_value = 4
|
|
sock2 = mock.Mock()
|
|
sock2.recv.return_value = FAKE_LLDP_PACKET
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([sock1], [], []),
|
|
([sock2], [], [])
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.assert_called_with(1600)
|
|
sock2.recv.assert_called_with(1600)
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces, 2 calls to enter promiscuous mode, 1 to leave
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info_socket_recv_error(self, sock_mock, select_mock,
|
|
fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')]
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.recv.side_effect = socket.error("BOOM")
|
|
|
|
sock2 = mock.Mock()
|
|
sock2.recv.return_value = FAKE_LLDP_PACKET
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([sock1], [], []),
|
|
([sock2], [], [])
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.assert_called_with(1600)
|
|
sock2.recv.assert_called_with(1600)
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces, 2 calls to enter promiscuous mode, 1 to leave
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info_multiple(self, sock_mock, select_mock, fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')],
|
|
'eth0': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')]
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.recv.return_value = FAKE_LLDP_PACKET
|
|
sock1.fileno.return_value = 4
|
|
sock2 = mock.Mock()
|
|
sock2.recv.return_value = FAKE_LLDP_PACKET
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([sock1], [], []),
|
|
([sock2], [], []),
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.assert_called_with(1600)
|
|
sock2.recv.assert_called_with(1600)
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces, 2 calls to enter promiscuous mode, 1 to leave
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
expected_calls = [
|
|
mock.call([sock1, sock2], [], [], cfg.CONF.lldp_timeout),
|
|
mock.call([sock2], [], [], cfg.CONF.lldp_timeout),
|
|
]
|
|
self.assertEqual(expected_calls, select_mock.call_args_list)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info_one_empty_interface(self, sock_mock, select_mock,
|
|
fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [],
|
|
'eth0': [
|
|
(0, b''),
|
|
(1, b'\x04\x88Z\x92\xecTY'),
|
|
(2, b'\x05Ethernet1/18'),
|
|
(3, b'\x00x')]
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.recv.return_value = FAKE_LLDP_PACKET
|
|
sock1.fileno.return_value = 4
|
|
sock2 = mock.Mock()
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([sock1], [], []),
|
|
([], [], []),
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.assert_called_with(1600)
|
|
sock2.recv.not_called()
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces, 2 calls to enter promiscuous mode, 1 to leave
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info_empty(self, sock_mock, select_mock, fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [],
|
|
'eth0': []
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.fileno.return_value = 4
|
|
sock2 = mock.Mock()
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([], [], []),
|
|
([], [], [])
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.not_called()
|
|
sock2.recv.not_called()
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces * (2 calls to enter promiscuous mode + 1 to leave) = 6
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('select.select', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_get_lldp_info_malformed(self, sock_mock, select_mock, fcntl_mock):
|
|
expected_lldp = {
|
|
'eth1': [],
|
|
'eth0': [],
|
|
}
|
|
|
|
interface_names = ['eth0', 'eth1']
|
|
|
|
sock1 = mock.Mock()
|
|
sock1.recv.return_value = b'123456789012345' # odd size
|
|
sock1.fileno.return_value = 4
|
|
sock2 = mock.Mock()
|
|
sock2.recv.return_value = b'1234' # too small
|
|
sock2.fileno.return_value = 5
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
select_mock.side_effect = [
|
|
([sock1], [], []),
|
|
([sock2], [], []),
|
|
]
|
|
|
|
lldp_info = netutils.get_lldp_info(interface_names)
|
|
self.assertEqual(expected_lldp, lldp_info)
|
|
|
|
sock1.bind.assert_called_with(('eth0', netutils.LLDP_ETHERTYPE))
|
|
sock2.bind.assert_called_with(('eth1', netutils.LLDP_ETHERTYPE))
|
|
|
|
sock1.recv.assert_called_with(1600)
|
|
sock2.recv.assert_called_with(1600)
|
|
|
|
self.assertEqual(1, sock1.close.call_count)
|
|
self.assertEqual(1, sock2.close.call_count)
|
|
|
|
# 2 interfaces, 2 calls to enter promiscuous mode, 1 to leave
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
expected_calls = [
|
|
mock.call([sock1, sock2], [], [], cfg.CONF.lldp_timeout),
|
|
mock.call([sock2], [], [], cfg.CONF.lldp_timeout),
|
|
]
|
|
self.assertEqual(expected_calls, select_mock.call_args_list)
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_raw_promiscuous_sockets(self, sock_mock, fcntl_mock):
|
|
interfaces = ['eth0', 'ens9f1']
|
|
protocol = 3
|
|
sock1 = mock.Mock()
|
|
sock2 = mock.Mock()
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
with netutils.RawPromiscuousSockets(interfaces, protocol) as sockets:
|
|
# 2 interfaces, 1 get, 1 set call each
|
|
self.assertEqual(4, fcntl_mock.call_count)
|
|
self.assertEqual([('eth0', sock1), ('ens9f1', sock2)], sockets)
|
|
sock1.bind.assert_called_once_with(('eth0', protocol))
|
|
sock2.bind.assert_called_once_with(('ens9f1', protocol))
|
|
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
sock1.close.assert_called_once_with()
|
|
sock2.close.assert_called_once_with()
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_raw_promiscuous_sockets_bind_fail(self, sock_mock, fcntl_mock):
|
|
interfaces = ['eth0', 'ens9f1']
|
|
protocol = 3
|
|
sock1 = mock.Mock()
|
|
sock2 = mock.Mock()
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
sock2.bind.side_effect = RuntimeError()
|
|
|
|
def _run_with_bind_fail():
|
|
with netutils.RawPromiscuousSockets(interfaces, protocol):
|
|
self.fail('Unreachable code')
|
|
|
|
self.assertRaises(RuntimeError, _run_with_bind_fail)
|
|
|
|
sock1.bind.assert_called_once_with(('eth0', protocol))
|
|
sock2.bind.assert_called_once_with(('ens9f1', protocol))
|
|
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
sock1.close.assert_called_once_with()
|
|
sock2.close.assert_called_once_with()
|
|
|
|
@mock.patch('fcntl.ioctl', autospec=True)
|
|
@mock.patch('socket.socket', autospec=socket_socket_sig)
|
|
def test_raw_promiscuous_sockets_exception(self, sock_mock, fcntl_mock):
|
|
interfaces = ['eth0', 'ens9f1']
|
|
protocol = 3
|
|
sock1 = mock.Mock()
|
|
sock2 = mock.Mock()
|
|
|
|
sock_mock.side_effect = [sock1, sock2]
|
|
|
|
def _run_with_exception():
|
|
with netutils.RawPromiscuousSockets(interfaces, protocol):
|
|
raise RuntimeError()
|
|
|
|
self.assertRaises(RuntimeError, _run_with_exception)
|
|
|
|
sock1.bind.assert_called_once_with(('eth0', protocol))
|
|
sock2.bind.assert_called_once_with(('ens9f1', protocol))
|
|
|
|
self.assertEqual(6, fcntl_mock.call_count)
|
|
|
|
sock1.close.assert_called_once_with()
|
|
sock2.close.assert_called_once_with()
|
|
|
|
def test_wrap_ipv6(self):
|
|
res = netutils.wrap_ipv6('1:2::3:4')
|
|
self.assertEqual('[1:2::3:4]', res)
|
|
|
|
def test_wrap_ipv6_with_ipv4(self):
|
|
res = netutils.wrap_ipv6('1.2.3.4')
|
|
self.assertEqual('1.2.3.4', res)
|